You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bluemarlin.apache.org by ra...@apache.org on 2022/01/07 17:01:19 UTC

[incubator-bluemarlin] branch main updated: experimented code for performance issue

This is an automated email from the ASF dual-hosted git repository.

radibnia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git


The following commit(s) were added to refs/heads/main by this push:
     new 79a5baf  experimented code for performance issue
     new ec0398b  Merge pull request #27 from rangaswamymr/main
79a5baf is described below

commit 79a5baf4f7e2e97f57c3f7ce6103fa22aab55f02
Author: Rangsawamy M R <ra...@huawei.com>
AuthorDate: Fri Jan 7 20:32:47 2022 +0530

    experimented code for performance issue
---
 .../main_spark_data_process.py                     | 180 +++++++++++++++
 .../main_spark_predict_es.py                       | 246 +++++++++++++++++++++
 .../dl_predictor_performance_issue/sparkesutil.py  | 156 +++++++++++++
 3 files changed, 582 insertions(+)

diff --git a/Model/predictor-dl-model/experiments/dl_predictor_performance_issue/main_spark_data_process.py b/Model/predictor-dl-model/experiments/dl_predictor_performance_issue/main_spark_data_process.py
new file mode 100644
index 0000000..15c020c
--- /dev/null
+++ b/Model/predictor-dl-model/experiments/dl_predictor_performance_issue/main_spark_data_process.py
@@ -0,0 +1,180 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+# -*- coding: UTF-8 -*-
+import sys
+import yaml
+import argparse
+
+from pyspark import SparkContext, SparkConf, Row
+from pyspark.sql.functions import concat_ws, count, lit, col, udf, expr, collect_list, create_map, sum as sum_agg, \
+    struct, explode
+from pyspark.sql.types import IntegerType, StringType, ArrayType, MapType, FloatType, BooleanType
+from pyspark.sql import HiveContext
+from forecaster import Forecaster
+from sparkesutil import *
+from datetime import datetime, timedelta
+import pickle
+
+
+def sum_count_array(hour_counts):
+    result_map = {}
+    for item in hour_counts:
+        for _, v in item.items():
+            for i in v:
+                key, value = i.split(':')
+                if key not in result_map:
+                    result_map[key] = 0
+                result_map[key] += int(value)
+    result = []
+    for key, value in result_map.items():
+        result.append(key + ":" + str(value))
+    return result
+
+
+def run(cfg, yesterday):
+    sc = SparkContext()
+    hive_context = HiveContext(sc)
+    forecaster = Forecaster(cfg)
+    sc.setLogLevel(cfg['log_level'])
+
+    # Reading the max bucket_id
+    dl_data_path = cfg['dl_predict_ready_path']
+    bucket_size = cfg['bucket_size']
+    bucket_step = cfg['bucket_step']
+    factdata_area_map = cfg['factdata']
+    distribution_table = cfg['distribution_table']
+    norm_table = cfg['norm_table']
+    dl_uckey_cluster_path = cfg['dl_uckey_cluster_path']
+
+    model_stats = get_model_stats_using_pickel(cfg)
+    if not model_stats:
+        sys.exit("dl_spark_cmd: " + "null model stats")
+
+    # Read dist
+    command = "SELECT DIST.uckey, DIST.ratio, DIST.cluster_uckey, DIST.price_cat FROM {} AS DIST ".format(
+        distribution_table)
+
+    df_dist = hive_context.sql(command)
+    df_dist = df_dist.repartition("uckey")
+    df_dist.cache()
+
+    # create day_list from yesterday for train_window
+    duration = model_stats['model']['duration']
+    day = datetime.strptime(yesterday, '%Y-%m-%d')
+    day_list = []
+    for _ in range(0, duration):
+        day_list.append(datetime.strftime(day, '%Y-%m-%d'))
+        day = day + timedelta(days=-1)
+    day_list.sort()
+
+    df_prediction_ready = None
+    df_uckey_cluster = None
+    start_bucket = 0
+    global i
+    i = sc.accumulator(0)
+
+    while True:
+
+        end_bucket = min(bucket_size, start_bucket + bucket_step)
+
+        if start_bucket > end_bucket:
+            break
+
+        # Read factdata table
+        command = " SELECT FACTDATA.count_array, FACTDATA.day, FACTDATA.hour, FACTDATA.uckey FROM {} AS FACTDATA WHERE FACTDATA.bucket_id BETWEEN {} AND {}  and FACTDATA.day in {}".format(
+            factdata_area_map, str(start_bucket), str(end_bucket), tuple(day_list))
+
+        start_bucket = end_bucket + 1
+
+        df = hive_context.sql(command)
+        # decrease partitions
+        df = df.coalesce(200)
+
+        if len(eligble_slot_ids) > 0:
+            df = df.filter(udf(lambda x: eligble_slot_ids.__contains__(x.split(",")[1]), BooleanType())(df.uckey))
+        df = df.withColumn('hour_price_imp_map',
+                           expr("map(hour, count_array)"))
+
+        df = df.groupBy('uckey', 'day').agg(
+            collect_list('hour_price_imp_map').alias('hour_price_imp_map_list'))
+
+        df = df.withColumn('day_price_imp', udf(
+            sum_count_array, ArrayType(StringType()))(df.hour_price_imp_map_list)).drop('hour_price_imp_map_list')
+
+        df = df.withColumn('day_price_imp_map', expr(
+            "map(day, day_price_imp)"))
+
+        df = df.groupBy('uckey').agg(collect_list(
+            'day_price_imp_map').alias('day_price_imp_map_list'))
+
+        df = df.join(df_dist, on=['uckey'], how='inner')
+        df.cache()
+
+        # df_uckey_cluster keeps the ratio and cluster_key for only uckeys that are being processed
+
+        df_uckey_cluster = df.select(
+            'uckey', 'cluster_uckey', 'ratio', 'price_cat')
+
+        df = df.groupBy('cluster_uckey', 'price_cat').agg(
+            collect_list('day_price_imp_map_list').alias('cluster_day_price_imp_list'))
+        df = df.withColumn('ts', udf(sum_day_count_array,
+                                     ArrayType(MapType(StringType(), ArrayType(StringType()))))(
+            df.cluster_day_price_imp_list))
+
+        df = df.drop('cluster_day_price_imp_list')
+        dl_data_path = 'dl_prediction_ready'
+
+        if i.value == 0:
+            df.coalesce(100).write.mode('overwrite').parquet(dl_data_path)
+            df_uckey_cluster.coalesce(100).write.mode('overwrite').parquet(dl_uckey_cluster_path)
+
+        else:
+            df.coalesce(100).write.mode('append').parquet(dl_data_path)
+            df_uckey_cluster.coalesce(100).write.mode('append').parquet(dl_uckey_cluster_path)
+
+        i += 1
+        df.unpersist()
+
+    sc.stop()
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(description='Post data process')
+    parser.add_argument('config_file')
+    parser.add_argument('yesterday', help='end date in yyyy-mm-dd formate')
+    args = parser.parse_args()
+    # Load config file
+    try:
+        with open(args.config_file, 'r') as ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+
+    except IOError as e:
+        print(
+            "Open config file unexpected error: I/O error({0}): {1}".format(e.errno, e.strerror))
+    except Exception as e:
+        print("Unexpected error:{}".format(sys.exc_info()[0]))
+        raise
+    finally:
+        ymlfile.close()
+
+    yesterday = args.yesterday
+
+    eligble_slot_ids = cfg['eligble_slot_ids']
+    yesterday = str(yesterday)
+
+    run(cfg, yesterday)
diff --git a/Model/predictor-dl-model/experiments/dl_predictor_performance_issue/main_spark_predict_es.py b/Model/predictor-dl-model/experiments/dl_predictor_performance_issue/main_spark_predict_es.py
new file mode 100644
index 0000000..05eb3a1
--- /dev/null
+++ b/Model/predictor-dl-model/experiments/dl_predictor_performance_issue/main_spark_predict_es.py
@@ -0,0 +1,246 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+# -*- coding: UTF-8 -*-
+import sys
+import json
+import time
+import yaml
+import argparse
+
+from pyspark import SparkContext, SparkConf, Row
+from pyspark.sql.functions import concat_ws, count, lit, col, udf, expr, collect_list, create_map, sum as sum_agg, \
+    struct, explode
+from pyspark.sql.types import IntegerType, StringType, ArrayType, MapType, FloatType, BooleanType, LongType
+from pyspark.sql import HiveContext, SQLContext
+from forecaster import Forecaster
+from sparkesutil import *
+import transform
+from datetime import datetime, timedelta
+import secrets
+from imscommon_dl.es.esclient import ESClient
+from imscommon_dl.es.es_predictions_dao import ESPredictionsDAO
+import pickle
+
+import logging
+from logging.config import fileConfig
+
+
+def multiply_each_value_of_map_with_ratio(day_prediction_map, ratio):
+    for k, v in day_prediction_map.items():
+        day_prediction_map[k] = v * ratio
+    return day_prediction_map
+
+
+def get_day_count_map(map_list):
+    merge_map = {}
+    for map in map_list:
+        for k, v in map.items():
+            if not merge_map.__contains__(k):
+                merge_map[k] = 0.0
+            merge_map[k] = merge_map[k] + v
+    date_map = {}
+    for date, impr in merge_map.items():
+        impr = int(impr)
+        count_list_map = [{"total": impr, "h1": impr, "hr": 0}]
+        date_map[date] = count_list_map
+    return date_map
+
+
+def add_uckey_to_json(uckey, date_count_map):
+    adv_type, slot, net_type, gender, age, pricing_type, residence_city_region, city_code_region = uckey.split(",")
+    prediction_json = {}
+    prediction_json["cc"] = ""
+    prediction_json["a"] = age
+    prediction_json["algo_id"] = "dl"
+    prediction_json["r"] = residence_city_region
+    prediction_json["t"] = net_type
+    prediction_json["ipl"] = city_code_region
+    prediction_json["records"] = []
+    prediction_json["si"] = slot
+    prediction_json["g"] = gender
+    prediction_json["cpoi"] = ""
+    prediction_json["m"] = adv_type
+    prediction_json["rpoi"] = ""
+    prediction_json["pm"] = pricing_type
+    prediction_json["uckey"] = uckey
+    prediction_json["predictions"] = date_count_map
+    return json.dumps(prediction_json)
+
+
+def push_data_es(x):
+    es_predictions_bulk_buff = []
+    for it in x:
+        it_predictions_str = it["hits"]
+        es_predictions_bulk_buff.append(it_predictions_str)
+        if len(es_predictions_bulk_buff) >= cfg['bulk_buffer_size']:
+            es_predictions_dao.bulk_index(ucdocs=es_predictions_bulk_buff)
+            es_predictions_bulk_buff = []
+            time.sleep(secrets.choice([1, 2, 3, 4]))
+    if len(es_predictions_bulk_buff) > 0:
+        es_predictions_dao.bulk_index(ucdocs=es_predictions_bulk_buff)
+
+
+def post_index_alias(esclient, index_new_name, index_alias):
+    old_index = esclient.get_alias(index_alias)
+    mesg = "removing old es index: {},  new es index: {}, es imdex alias name: {}".format(old_index, index_new_name,
+                                                                                          index_alias)
+    logger.info(mesg)
+    esclient.remove_add_alias(old_index, index_new_name, index_alias)
+
+
+def get_preditction_in_hdfs_formate(df):
+    df = df.select(df.uckey, explode(df.day_count_map)).withColumnRenamed("key", "day")
+    impr_udf = udf(lambda x: x[0]["total"], IntegerType())
+    df = df.withColumn('impr', impr_udf(df.value)).select("uckey", "day", "impr")
+    return df
+
+
+def run(cfg, yesterday, serving_url):
+    sc = SparkContext()
+    hive_context = HiveContext(sc)
+    sqlcontext = SQLContext(sc)
+    forecaster = Forecaster(cfg)
+    sc.setLogLevel(cfg['log_level'])
+
+    dl_data_path = cfg['dl_predict_ready_path']
+    dl_uckey_cluster_path = cfg['dl_uckey_cluster_path']
+    distribution_table = cfg['distribution_table']
+    norm_table = cfg['norm_table']
+
+    model_stats = get_model_stats_using_pickel(cfg)
+    if not model_stats:
+        sys.exit("dl_spark_cmd: ", "null model stats")
+
+    # Read dist
+    command = "SELECT DIST.uckey, DIST.ratio, DIST.cluster_uckey, DIST.price_cat FROM {} AS DIST ".format(
+        distribution_table)
+
+    df_dist = hive_context.sql(command)
+    df_dist = df_dist.repartition("uckey")
+    df_dist.cache()
+
+    # Read norm table
+    command = "SELECT uckey AS cluster_uckey, price_cat, a__n,a_1_n,a_2_n,a_3_n,a_4_n,a_5_n,a_6_n, t_UNKNOWN_n,t_3G_n,t_4G_n,t_WIFI_n,t_2G_n, g__n, g_g_f_n, g_g_m_n, g_g_x_n, price_cat_1_n, price_cat_2_n, price_cat_3_n, si_vec_n FROM {} ".format(
+        norm_table)
+    df_norm = hive_context.sql(command)
+
+    # create day_list from yesterday for train_window
+    duration = model_stats['model']['duration']
+    day = datetime.strptime(yesterday, '%Y-%m-%d')
+    day_list = []
+    for _ in range(0, duration):
+        day_list.append(datetime.strftime(day, '%Y-%m-%d'))
+        day = day + timedelta(days=-1)
+    day_list.sort()
+
+    df = sqlcontext.read.parquet(dl_data_path)
+    df_uckey_cluster = sqlcontext.read.parquet(dl_uckey_cluster_path)
+
+    df = df.groupBy('cluster_uckey', 'price_cat').agg(collect_list('ts').alias('ts_list'))
+    df = df.withColumn('ts',
+                       udf(sum_day_count_array, ArrayType(MapType(StringType(), ArrayType(StringType()))))(df.ts_list))
+    df = df.drop('ts_list')
+
+    df = df.join(df_norm, on=['cluster_uckey', 'price_cat'], how='inner')
+    df = df.join(df_uckey_cluster, on=[
+        'cluster_uckey', 'price_cat'], how='inner')
+
+    df = df.where(df.uckey.like('%native,b6le0s4qo8,4G,g_f,5,CPC,,1156320000%'))
+    predictor_udf = udf(transform.predict_daily_uckey(days=day_list,
+                                                      serving_url=serving_url, forecaster=forecaster,
+                                                      model_stats=model_stats, columns=df.columns, config=cfg),
+                        MapType(StringType(), FloatType()))
+
+    df = df.withColumn('day_prediction_map',
+                       predictor_udf(struct([df[name] for name in df.columns])))
+
+    df = df.select('cluster_uckey', 'price_cat', 'day_prediction_map', 'ratio', 'uckey')
+
+    mul_udf = udf(multiply_each_value_of_map_with_ratio, MapType(StringType(), FloatType()))
+    df = df.withColumn('day_prediction_map', mul_udf(df.day_prediction_map, df.ratio))
+
+    df = df.groupBy('uckey').agg(collect_list('day_prediction_map').alias('map_list'))
+
+    count_map_udf = udf(get_day_count_map, MapType(StringType(), ArrayType(MapType(StringType(), LongType()))))
+    df = df.withColumn('day_count_map', count_map_udf(df.map_list))
+    df = df.select(df.uckey, df.day_count_map)
+
+    df.cache()
+    hdfs_df = df
+
+    df = df.withColumn('hits', udf(lambda uckey, maps: add_uckey_to_json(uckey, maps), StringType())(df.uckey,
+                                                                                                     df.day_count_map)).select(
+        "hits")
+    
+    hdfs_df = get_preditction_in_hdfs_formate(hdfs_df)
+    hdfs_df.show()
+    hdfs_df.coalesce(hdfs_write_threads).write.mode('overwrite').partitionBy("day").parquet(cfg["hdfs_prefix_path"])
+
+    sc.stop()
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(description='Prepare data')
+    parser.add_argument('config_file')
+    parser.add_argument('yesterday', help='end date in yyyy-mm-dd formate')
+    args = parser.parse_args()
+    # Load config file
+    try:
+        with open(args.config_file, 'r') as ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+
+    except IOError as e:
+        print(
+            "Open config file unexpected error: I/O error({0}): {1}".format(e.errno, e.strerror))
+    except Exception as e:
+        print("Unexpected error:{}".format(sys.exc_info()[0]))
+        raise
+    finally:
+        ymlfile.close()
+
+    yesterday = args.yesterday
+    es_json_dir = cfg["es_json_dir"]
+    index_data = dict()
+    with open(es_json_dir + '/put_predictor_index_css.json', 'r') as myfile:
+        index_data['css'] = myfile.read()
+
+
+    eligble_slot_ids = cfg['eligble_slot_ids']
+    yesterday = str(yesterday)
+    es_prediction_index = cfg["es_predictions_index"] + "_" + yesterday
+    es_prediction_type = cfg['es_predictions_type']
+    refresh_index_wait_time = cfg["refresh_index_wait_time"]
+    es_write_threads = cfg["es_write_threads"]
+    hdfs_write_threads = cfg["hdfs_write_threads"]
+    serving_url = cfg["serving_url"]
+
+    es_cfg = dict()
+    es_cfg['es_mode'] = cfg["es_mode"]
+    es_cfg['css_url'] = cfg["css_url"]
+    es_cfg['pem_path'] = cfg['pem_path']
+
+    predictions_type = dict()
+    predictions_type['css'] = cfg['es_predictions_type']
+
+    es_predictions = " value removed"
+    es_predictions_dao = "value removed"
+    cfg["signKey"] = 'provide value'
+    cfg["sign_prefix"] = 'provide value'
+    run(cfg, yesterday, serving_url)
+    mesg = "dl_spark_cmd: ", "prediction save in ES index: ", es_prediction_index, "  ,and save the one copy  in hdfs at path: ", \
+           cfg["hdfs_prefix_path"]
+
diff --git a/Model/predictor-dl-model/experiments/dl_predictor_performance_issue/sparkesutil.py b/Model/predictor-dl-model/experiments/dl_predictor_performance_issue/sparkesutil.py
new file mode 100644
index 0000000..321aea8
--- /dev/null
+++ b/Model/predictor-dl-model/experiments/dl_predictor_performance_issue/sparkesutil.py
@@ -0,0 +1,156 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from imscommon_dl.model.ucdoc import UCDoc
+from imscommon_dl.model.ucday import UCDay
+from imscommon_dl.model.uchour import UCHour
+from imscommon_dl.es.ims_esclient import ESClient
+import yaml
+import pickle
+
+
+def build_dict_from_counts(h, h_index):
+    dict = {}
+    dict['h_index'] = h_index
+    h0 = {'h': 'h0', 't': 0}
+    h1 = {'h': 'h1', 't': 0}
+    h2 = {'h': 'h2', 't': 0}
+    h3 = {'h': 'h3', 't': 0}
+
+    for _ in h:
+        s = _.split(':')
+        index = int(s[0])
+        count = float(s[1])
+        if index == 0:
+            h0['t'] = count
+        elif index == 1:
+            h1['t'] = count
+        elif index == 2:
+            h2['t'] = count
+        elif index == 3:
+            h3['t'] = count
+
+    dict['h0'] = h0
+    dict['h1'] = h1
+    dict['h2'] = h2
+    dict['h3'] = h3
+
+    return dict
+
+
+def convert_day_hour_counts_to_ucdoc(uckey, day_hour_counts):
+    ucdoc = UCDoc.build_from_concat_string(uckey)
+    for day_map in day_hour_counts:
+        if len(day_map) > 0:
+            # There is only one key here.
+            day = next(iter(day_map.keys()))
+            hour_count_map = day_map[day]
+            for hour_map in hour_count_map:
+                if len(hour_map) > 0:
+                    hour = next(iter(hour_map.keys()))
+                    h = hour_map[hour]
+                    records = ucdoc.records
+                    if day not in records:
+                        records[day] = UCDay(str(day))
+                    dict = build_dict_from_counts(h, hour)
+                    uchour = UCHour.buildv1(dict)
+                    records[day].hours[hour] = uchour
+    return ucdoc
+
+
+def convert_predictions_json_to_sorted_ucdays(predictions):
+    ucdays = []
+    days = predictions.keys()
+    sorted_days = sorted(days)
+    for day in sorted_days:
+        ucday = UCDay(day)
+        for i in range(0, 24):
+            hour_json = predictions[day][i]
+            ucday.hours[i] = UCHour.buildv2(hour_json)
+        ucdays.append(ucday)
+    return ucdays
+
+
+def get_model_stats(cfg, model_name, model_version):
+    es = ESClient(cfg['es_host'], cfg['es_port'],
+                  cfg['es_model_index'], cfg['es_model_type'])
+    body = {
+        "query": {"bool": {"must": [
+            {"match": {
+                "model.name": model_name
+            }},
+            {"match": {
+                "model.version": model_version
+            }}
+        ]}}
+    }
+    doc = es.search(body)
+    if doc == None or len(doc) != 1:
+        raise Exception(
+            'model/version {}/{} not valid'.format(model_name, model_version))
+    return doc[0]
+
+
+def get_model_stats_using_pickel(cfg):
+    model_config_path = cfg["config_path"]
+    pkl_path = cfg["tf_statistics_path"]
+    try:
+        path = model_config_path + "/config_dl_model.yml"
+        with open(path, 'r') as ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+            try:
+                with open(pkl_path, 'rb') as handle:
+                    pk = pickle.load(handle)
+                result = {'model': {
+                    'name': cfg['trainer']['name'], 'version': cfg['save_model']['model_version'],
+                    'duration': cfg['tfrecorder_reader']['duration'], 'train_window': cfg['save_model']['train_window'],
+                    'predict_window': cfg['trainer']['predict_window']}, 'stats': pk['stats']}
+                return result
+            except Exception as e:
+                print("pickel file path ", pkl_path, "   exception:  ", e)
+            finally:
+                handle.close()
+    except Exception as e:
+        print("dl config path: ", path, "   exception:  ", e)
+    finally:
+        ymlfile.close()
+    return None
+
+
+def sum_day_count_array(day_count_arrays):
+    result_map = {}
+    for day_count_array in day_count_arrays:
+        for item in day_count_array:
+            for day, v in item.items():
+                if not day:
+                    continue
+                if day not in result_map:
+                    result_map[day] = []
+                result_map[day] = add_count_arrays(result_map[day], v)
+    return [result_map]
+
+
+def add_count_arrays(ca1, ca2):
+    result_map = {}
+    for i in ca1 + ca2:
+        key, value = i.split(':')
+        if key not in result_map:
+            result_map[key] = 0
+        result_map[key] += int(value)
+    result = []
+    for key, value in result_map.items():
+        result.append(key + ":" + str(value))
+    return result