You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bluemarlin.apache.org by ra...@apache.org on 2021/11/10 22:41:04 UTC
[incubator-bluemarlin] branch main updated: update
This is an automated email from the ASF dual-hosted git repository.
radibnia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git
The following commit(s) were added to refs/heads/main by this push:
new b5cc16c update
new ccbbf99 Merge pull request #15 from radibnia77/main
b5cc16c is described below
commit b5cc16cee37b2d1cbb1c8ff2503791eb4d1a9628
Author: Reza <re...@yahoo.com>
AuthorDate: Wed Nov 10 14:40:19 2021 -0800
update
remove residency
remove region mapping for ipl
---
Model/predictor-dl-model/VERSION.md | 7 +-
.../dags/dlpm_data_prep_dag_05142021_1500.py | 76 -
.../7day_variance_uckey_weight_in_slotid.py | 63 +
.../predictor_dl_model/config.yml | 119 +-
.../predictor_dl_model/pipeline/main_cluster.py | 52 +-
.../pipeline/main_filter_si_region_bucket.py | 57 +-
.../predictor_dl_model/pipeline/main_norm.py | 13 +-
.../predictor_dl_model/pipeline/main_ts.py | 47 +-
.../predictor_dl_model/pipeline/transform.py | 4 +-
Model/predictor-dl-model/predictor_dl_model/run.sh | 19 +-
.../predictor_dl_model/trainer/client_rest_dl2.py | 161 +-
.../predictor-dl-model/scripts/check_stable_si.py | 32 +
.../scripts/import_factdata_files_1.py | 61 +
.../predictor-dl-model/tests/pipeline/test_base.py | 12 +-
.../troubleshooting/check_model.py | 288 +++
.../troubleshooting/client_rest_dl2.py | 318 ++++
Processes/dlpredictor/VERSION.md | 5 +-
Processes/dlpredictor/conf/config.yml | 23 +-
.../dlpredictor/dlpredictor/main_build_ipl_dist.py | 144 --
Processes/dlpredictor/dlpredictor/main_spark_es.py | 84 +-
Processes/dlpredictor/dlpredictor/show_config.py | 6 +-
.../experiments/elasticsearch-hadoop-6.8.0.jar | Bin 0 -> 1040608 bytes
.../experiments/scripts/test_spark_es_big_write.py | 1836 ++++++++++++++++++++
.../experiments/scripts/test_spark_es_write.py | 59 +
.../lib/predictor_dl_model-1.6.0-py2.7.egg | Bin 120549 -> 148644 bytes
Processes/dlpredictor/run.sh | 15 +-
.../si_traffic_prediction_check_by_agg.py | 59 +-
27 files changed, 2935 insertions(+), 625 deletions(-)
diff --git a/Model/predictor-dl-model/VERSION.md b/Model/predictor-dl-model/VERSION.md
index 96d2437..5cc2f09 100644
--- a/Model/predictor-dl-model/VERSION.md
+++ b/Model/predictor-dl-model/VERSION.md
@@ -9,4 +9,9 @@
### 1.6
1. Add region and IPL features
-2. Add TAG to config file. The whole set of tmp tables are named by product_tag and pipeline_tag. The user does not need to review the name of those tables anymore.
\ No newline at end of file
+2. Add TAG to config file. The whole set of tmp tables are named by product_tag and pipeline_tag. The user does not need to review the name of those tables anymore.
+
+### 1.7
+1. Remove residency from UCKey. The value of residency is repleace by an empty string. The number of commas are still the same.
+2. Remove region mapping for IPL.
+3. Remove normalization of residency and IPL in main_norm.
\ No newline at end of file
diff --git a/Model/predictor-dl-model/dags/dlpm_data_prep_dag_05142021_1500.py b/Model/predictor-dl-model/dags/dlpm_data_prep_dag_05142021_1500.py
deleted file mode 100644
index b4e5a0e..0000000
--- a/Model/predictor-dl-model/dags/dlpm_data_prep_dag_05142021_1500.py
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0.html
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from airflow import DAG
-import datetime as dt
-from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
-from datetime import timedelta
-
-default_args = {
- 'owner': 'dlpm',
- 'depends_on_past': False,
- 'start_date': dt.datetime(2020, 9, 21),
- 'retries': 0 # ,
- # 'retry_delay': timedelta(minutes=5),
-}
-
-dag = DAG(
- 'dlpm_data_prep_05142021_1500',
- default_args=default_args,
- schedule_interval=None,
-)
-
-
-def sparkOperator(
- file,
- task_id,
- **kwargs
-):
- return SparkSubmitOperator(
- application='/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/pipeline/{}'.format(file),
- application_args=['/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/config-archive/config_05142021_1500.yml'],
- conn_id='spark_default',
- conf={'spark.driver.maxResultSize': '8g'},
- driver_memory='16G',
- executor_cores=3,
- num_executors=5,
- executor_memory='16G',
- task_id=task_id,
- dag=dag,
- **kwargs
- )
-
-
-main_filter_si_region_bucket = sparkOperator('main_filter_si_region_bucket.py', 'main_filter_si_region_bucket')
-
-main_ts = sparkOperator('main_ts.py',
- 'main_ts',
- py_files='/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/pipeline/transform.py')
-
-main_cluster = sparkOperator('main_cluster.py', 'main_cluster')
-
-main_distribution = sparkOperator('main_distribution.py', 'main_distribution')
-
-main_norm = sparkOperator('main_norm.py',
- 'main_norm',
- py_files='/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/pipeline/transform.py')
-
-main_tfrecords = sparkOperator('main_tfrecords.py',
- 'main_tfrecords',
- jars='/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/spark-tensorflow-connector_2.11-1.15.0.jar')
-
-
-main_filter_si_region_bucket >> main_ts >> main_cluster >> main_distribution >> main_norm >> main_tfrecords
diff --git a/Model/predictor-dl-model/experiments/7day_variance_uckey_weight_in_slotid.py b/Model/predictor-dl-model/experiments/7day_variance_uckey_weight_in_slotid.py
new file mode 100644
index 0000000..d5d9731
--- /dev/null
+++ b/Model/predictor-dl-model/experiments/7day_variance_uckey_weight_in_slotid.py
@@ -0,0 +1,63 @@
+from pyspark import SparkContext, SparkConf,SQLContext
+from pyspark.sql.functions import count, lit, col, udf, expr, collect_list, explode
+from pyspark.sql.types import IntegerType, StringType, MapType, ArrayType, BooleanType,FloatType
+from pyspark.sql import HiveContext
+from datetime import datetime, timedelta
+from pyspark.sql.functions import broadcast
+
+def _list_to_map(count_array):
+ count_map = {}
+ for item in count_array:
+ key_value = item.split(':')
+ count_map[key_value[0]] = key_value[1]
+ return count_map
+
+
+def add_count_map(df):
+ # Convert count_array to count_map
+ list_to_map_udf = udf(_list_to_map, MapType(
+ StringType(), StringType(), False))
+ df = df.withColumn('count_map', list_to_map_udf(df.count_array))
+ return df
+
+def variance(plist):
+ l=len(plist)
+ ex=sum(plist)/l
+ ex2=sum([i*i for i in plist])/l
+ return ex2-ex*ex
+
+
+
+query="select count_array,day,uckey from factdata where day in ('2020-05-15','2020-05-14','2020-05-13','2020-05-12','2020-05-11','2020-05-10','2020-05-09')"
+sc = SparkContext()
+hive_context = HiveContext(sc)
+
+df = hive_context.sql(query)
+df = add_count_map(df)
+
+df = df.select('uckey', 'day', explode(df.count_map)).withColumnRenamed("value", "impr_count")
+
+df = df.withColumn('impr_count', udf(lambda x: int(x), IntegerType())(df.impr_count))
+df = df.groupBy('uckey', 'day').sum('impr_count').withColumnRenamed("sum(impr_count)", 'impr_count')
+
+
+split_uckey_udf = udf(lambda x: x.split(","), ArrayType(StringType()))
+df = df.withColumn('col', split_uckey_udf(df.uckey))
+df = df.select('uckey','impr_count', 'day', df.col[1]).withColumnRenamed("col[1]", 'slot_id')
+
+
+df_slot=df.select('slot_id','impr_count', 'day')
+df_slot=df_slot.groupBy('slot_id','day').sum('impr_count').withColumnRenamed("sum(impr_count)", "impr_total")
+bc_df_slot = broadcast(df_slot)
+
+df_new = df.join(bc_df_slot, on=["slot_id",'day'],how="inner")
+
+df_new = df_new.withColumn('percent', udf(lambda x,y: (x*100)/y, FloatType())(df_new.impr_count,df_new.impr_total))
+
+
+df2=df_new.groupBy("uckey").agg(collect_list('percent').alias('percent'))
+df2 = df2.withColumn('var', udf(lambda x: variance(x), FloatType())(df2.percent))
+df2.select("uckey","var").orderBy(["var"],ascending=False).show(300,truncate=False)
+df2.cache()
+print("% uckeys having varience > 0.01 ",df2.filter((df2.var <= 0.01)).count()*100/df2.count())
+
diff --git a/Model/predictor-dl-model/predictor_dl_model/config.yml b/Model/predictor-dl-model/predictor_dl_model/config.yml
index 864dc32..fbc2ff2 100644
--- a/Model/predictor-dl-model/predictor_dl_model/config.yml
+++ b/Model/predictor-dl-model/predictor_dl_model/config.yml
@@ -1,54 +1,51 @@
product_tag: 'dlpm'
-pipeline_tag: '08092021_1200' # IMPORTANT: The pipeline tag has to be changed before each run to prevent record duplication.
-factdata_table_name: 'factdata_09202021' #factdata_hq_09222020
+pipeline_tag: '111021_no_residency_no_mapping' # IMPORTANT: The pipeline tag has to be changed before each run to prevent record duplication.
+factdata_table_name: 'factdata_10202021' #factdata_hq_09222020
log:
- level: 'WARN' # log level for spark and app
+ level: 'warn' # log level for spark and app
pipeline:
config_table: '{product_tag}_{pipeline_tag}_config'
filter: # This is for data filtering- si and region
percentile: 10 # This is for filtering traffic less than 1/10 of average traffic
- region_mapping_table: 'ipl_region_mapping_09282021' #region_mapping_01012020
output_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
init_start_bucket: 0
bucket_size: 1000
bucket_step: 100
new_bucket_size: 10
condition: ''
- new_si_list: ['15e9ddce941b11e5bdec00163e291137',
- '66bcd2720e5011e79bc8fa163e05184e',
- '7b0d7b55ab0c11e68b7900163e3e481d',
- 'a8syykhszz',
- 'w3wx3nv9ow5i97',
- 'x2fpfbm8rt',
- '17dd6d8098bf11e5bdec00163e291137',
- '5cd1c663263511e6af7500163e291137',
- '68bcd2720e5011e79bc8fa163e05184e',
- '71bcd2720e5011e79bc8fa163e05184e',
- 'a290af82884e11e5bdec00163e291137',
- 'a47eavw7ex',
- 'b6le0s4qo8',
- 'd4d7362e879511e5bdec00163e291137',
- 'd971z9825e',
- 'd9jucwkpr3',
- 'e351de37263311e6af7500163e291137',
- 'f1iprgyl13',
- 'j1430itab9wj3b',
- 'k4werqx13k',
- 'l03493p0r3',
- 'l2d4ec6csv',
- 'p7gsrebd4m',
- 's4z85pd1h8',
- 'w9fmyd5r0i',
- 'x0ej5xhk60kjwq',
- 'z041bf6g4s']
+ new_si_list: ['a47eavw7ex',
+ '66bcd2720e5011e79bc8fa163e05184e',
+ 'x0ej5xhk60kjwq',
+ 'l03493p0r3',
+ '7b0d7b55ab0c11e68b7900163e3e481d',
+ 'b6le0s4qo8',
+ 'e351de37263311e6af7500163e291137',
+ 'a290af82884e11e5bdec00163e291137',
+ '68bcd2720e5011e79bc8fa163e05184e',
+ 'f1iprgyl13',
+ 'w9fmyd5r0i',
+ 'w3wx3nv9ow5i97',
+ 'd971z9825e',
+ 'l2d4ec6csv',
+ 'z041bf6g4s',
+ '71bcd2720e5011e79bc8fa163e05184e',
+ '5cd1c663263511e6af7500163e291137',
+ 'x2fpfbm8rt',
+ 'd9jucwkpr3',
+ 'k4werqx13k',
+ 'j1430itab9wj3b',
+ 'a8syykhszz',
+ 's4z85pd1h8',
+ '17dd6d8098bf11e5bdec00163e291137',
+ 'd4d7362e879511e5bdec00163e291137']
time_series: # This is done on whole bucketized data
input_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
conditions: []
- yesterday: "2020-06-10" # data is used for training from -<prepare_past_days> to -1(yesterday)
- prepare_past_days: 102
+ yesterday: "2021-07-21" # data is used for training from -<prepare_past_days> to -1(yesterday)
+ prepare_past_days: 82 # this should be equal to duration.tfrecorder_reader
bucket_size: 10 # maximum number of buckets to process starting from 0
bucket_step: 1 # size of bucket batch that is processed in one iteration
output_table_name: '{product_tag}_{pipeline_tag}_tmp_ts' # name of the hive table that keeps cleansed and normalized data before writing it into tfrecords, over-writes the existing table
@@ -73,36 +70,32 @@ pipeline:
'a': ['','1','2','3','4','5','6'],
'g':['','g_f','g_m','g_x'],
't':['UNKNOWN','3G','4G','WIFI','2G'],
- 'r':['', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82'],
- 'ipl':['', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82'],
'si':[
- '15e9ddce941b11e5bdec00163e291137',
- '66bcd2720e5011e79bc8fa163e05184e',
- '7b0d7b55ab0c11e68b7900163e3e481d',
- 'a8syykhszz',
- 'w3wx3nv9ow5i97',
- 'x2fpfbm8rt',
- '17dd6d8098bf11e5bdec00163e291137',
- '5cd1c663263511e6af7500163e291137',
- '68bcd2720e5011e79bc8fa163e05184e',
- '71bcd2720e5011e79bc8fa163e05184e',
- 'a290af82884e11e5bdec00163e291137',
'a47eavw7ex',
- 'b6le0s4qo8',
- 'd4d7362e879511e5bdec00163e291137',
- 'd971z9825e',
- 'd9jucwkpr3',
- 'e351de37263311e6af7500163e291137',
- 'f1iprgyl13',
- 'j1430itab9wj3b',
- 'k4werqx13k',
- 'l03493p0r3',
- 'l2d4ec6csv',
- 'p7gsrebd4m',
- 's4z85pd1h8',
- 'w9fmyd5r0i',
- 'x0ej5xhk60kjwq',
- 'z041bf6g4s']
+ '66bcd2720e5011e79bc8fa163e05184e',
+ 'x0ej5xhk60kjwq',
+ 'l03493p0r3',
+ '7b0d7b55ab0c11e68b7900163e3e481d',
+ 'b6le0s4qo8',
+ 'e351de37263311e6af7500163e291137',
+ 'a290af82884e11e5bdec00163e291137',
+ '68bcd2720e5011e79bc8fa163e05184e',
+ 'f1iprgyl13',
+ 'w9fmyd5r0i',
+ 'w3wx3nv9ow5i97',
+ 'd971z9825e',
+ 'l2d4ec6csv',
+ 'z041bf6g4s',
+ '71bcd2720e5011e79bc8fa163e05184e',
+ '5cd1c663263511e6af7500163e291137',
+ 'x2fpfbm8rt',
+ 'd9jucwkpr3',
+ 'k4werqx13k',
+ 'j1430itab9wj3b',
+ 'a8syykhszz',
+ 's4z85pd1h8',
+ '17dd6d8098bf11e5bdec00163e291137',
+ 'd4d7362e879511e5bdec00163e291137']
}
holidays: ['2019-11-09', '2019-11-10', '2019-11-11', '2019-11-25', '2019-11-26', '2019-11-27','2019-11-28', '2019-12-24','2019-12-25', '2019-12-26','2019-12-31', '2020-01-01', '2020-01-02', '2020-01-19','2020-01-20', '2020-01-21', '2020-01-22', '2020-01-23', '2020-01-24', '2020-01-25', '2020-02-08']
tfrecords:
@@ -121,7 +114,7 @@ tfrecorder_reader:
end: '' # help="Effective end date. Data past the end is dropped"
corr_backoffset: 0 # default=0, type=int, help="Offset for correlation calculation"
batch_size: 11880 # batch size of exmaples in tfrecord
- duration: 90 # time series length, This has to less or equal prepare_past_days
+ duration: 82 # time series length, This has to less or equal prepare_past_days
tf_statistics_path: './tf_statistics_{pipeline_tag}.pkl'
trainer:
@@ -149,7 +142,7 @@ trainer:
max_steps: 20000 # type=int, help="Stop training after max steps"
save_from_step: 100 # type=int, help="Save model on each evaluation (10 evals per epoch), starting from this step"
predict_window: 10 # default=3, type=int, help="Number of days to predict"
- back_offset: 20
+ back_offset: 0 # don't change it.
save_model:
table: '{product_tag}_{pipeline_tag}_model_stat'
diff --git a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_cluster.py b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_cluster.py
index 129f6d6..580a419 100644
--- a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_cluster.py
+++ b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_cluster.py
@@ -62,7 +62,8 @@ def estimate_number_of_non_dense_clusters(df, median_popularity_of_dense, cluste
no_of_items_in_a_cluster = median_popularity_of_dense / median_non_dense_p
- no_of_cluster = df.filter('sparse=True').count() * 1.0 / no_of_items_in_a_cluster / 3.0
+ no_of_cluster = df.filter('sparse=True').count() * \
+ 1.0 / no_of_items_in_a_cluster / 3.0
# Ceiling for num virtual clusters set at a ratio of the number of dense uckeys.
dense_count = df.filter(df.sparse == False).count()
@@ -96,7 +97,7 @@ def agg_ts(mlist):
def agg_on_uckey_price_cat(df):
- column_names = ['ts', 'a', 'g', 't', 'si', 'r', 'ipl']
+ column_names = ['ts', 'a', 'g', 't', 'si', 'ipl']
agg_exprs = [collect_list(col).alias(col) for col in column_names]
df = df.groupBy('uckey', 'price_cat').agg(*agg_exprs)
@@ -146,8 +147,9 @@ def denoise(df, percentile):
df = df.withColumn('nonzero_p', udf(
lambda ts: 1.0 * sum(ts) / len([_ for _ in ts if _ != 0]) if len(
[_ for _ in ts if _ != 0]) != 0 else 0.0, FloatType())(df.ts))
- df = df.withColumn('nonzero_sd', udf(lambda ts: stdev([_ for _ in ts if _ !=0]))(df.ts))
-
+ df = df.withColumn('nonzero_sd', udf(
+ lambda ts: stdev([_ for _ in ts if _ != 0]))(df.ts))
+
df = df.withColumn('ts', udf(lambda ts, nonzero_p: [i if i and i > (nonzero_p / percentile) else 0 for i in ts],
ArrayType(IntegerType()))(df.ts, df.nonzero_p))
df = df.withColumn('ts', udf(lambda ts, nonzero_sd: [i if i and i < (nonzero_sd * 2) else 0 for i in ts],
@@ -169,17 +171,19 @@ def run(hive_context, cluster_size_cfg, input_table_name,
# Read factdata table
command = """
- SELECT ts, price_cat, uckey, a, g, t, si, r, ipl FROM {}
+ SELECT ts, price_cat, uckey, a, g, t, si, ipl FROM {}
""".format(input_table_name)
# DataFrame[uckey: string, price_cat: string, ts: array<int>, a: string, g: string, t: string, si: string, r: string]
df = hive_context.sql(command)
# add imp
- df = df.withColumn('imp', udf(lambda ts: sum([_ for _ in ts if _]), IntegerType())(df.ts))
+ df = df.withColumn('imp', udf(lambda ts: sum(
+ [_ for _ in ts if _]), IntegerType())(df.ts))
# add popularity = mean
- df = df.withColumn('p', udf(lambda ts: sum([_ for _ in ts if _])/(1.0 * len(ts)), FloatType())(df.ts))
+ df = df.withColumn('p', udf(lambda ts: sum(
+ [_ for _ in ts if _])/(1.0 * len(ts)), FloatType())(df.ts))
# add normalized popularity = mean_n
df, _ = transform.normalize_ohe_feature(df, ohe_feature='p')
@@ -206,27 +210,36 @@ def run(hive_context, cluster_size_cfg, input_table_name,
df_sparse = df.filter(df.sparse == True)
# Calculate the total impressions for each ad unit
- df_sparse = df_sparse.withColumn('si_imp_total', fn.sum('imp').over(Window.partitionBy('si')))
+ df_sparse = df_sparse.withColumn(
+ 'si_imp_total', fn.sum('imp').over(Window.partitionBy('si')))
# Calculate total impressions of the sparse uckeys.
imp_total = df_sparse.agg(fn.sum('imp')).collect()[0][0]
+ # that is when there is no, sparse uckey
+ if imp_total == None:
+ imp_total = 0
+
# Calculate the number of virtual clusters for each si based on the number of
# virtual clusters, the total impressions of the sparse uckeys, and the total
# impressions of each si.
imp_per_cluster = imp_total/number_of_virtual_clusters
- df_sparse = df_sparse.withColumn('si_num_cluster', udf(lambda si_imp_total: int((si_imp_total + imp_per_cluster - 1) / imp_per_cluster))(df_sparse.si_imp_total))
+ df_sparse = df_sparse.withColumn('si_num_cluster', udf(lambda si_imp_total: int(
+ (si_imp_total + imp_per_cluster - 1) / imp_per_cluster))(df_sparse.si_imp_total))
# Create a tie breaker column for assigning sparse uckeys from the same si
# to different virtual clusters.
- df_sparse = df_sparse.withColumn('tie_breaker', udf(lambda num_clusters: random.randint(0, num_clusters - 1))(df_sparse.si_num_cluster))
+ df_sparse = df_sparse.withColumn('tie_breaker', udf(
+ lambda num_clusters: random.randint(0, num_clusters - 1))(df_sparse.si_num_cluster))
# Assign a cluster number to the sparse uckeys based on si and the tie breaker.
- df_sparse = df_sparse.withColumn('cn', dense_rank().over(Window.orderBy('si', 'tie_breaker')))
+ df_sparse = df_sparse.withColumn(
+ 'cn', dense_rank().over(Window.orderBy('si', 'tie_breaker')))
# Add the same columns for the dense uckeys so they can be recombined.
+ # si_imp_total is calcualted for df_sparse.
df_dense = df.filter(df.sparse == False)
- df_dense = df_dense.withColumn('si_imp_total', df_dense['imp'])
+ df_dense = df_dense.withColumn('si_imp_total', lit(0))
df_dense = df_dense.withColumn('si_num_cluster', lit(1))
df_dense = df_dense.withColumn('tie_breaker', lit(0))
df_dense = df_dense.withColumn('cn', lit(0))
@@ -239,24 +252,31 @@ def run(hive_context, cluster_size_cfg, input_table_name,
__save_as_table(df, pre_cluster_table_name, hive_context, True)
# Change the uckey for sparse uckeys their cluster number.
- df = df.withColumn('uckey', udf(lambda uckey, cn, sparse: str(cn) if sparse else uckey, StringType())(df.uckey, df.cn, df.sparse))
+ df = df.withColumn('uckey', udf(lambda uckey, cn, sparse: str(
+ cn) if sparse else uckey, StringType())(df.uckey, df.cn, df.sparse))
df = agg_on_uckey_price_cat(df)
# add imp
- df = df.withColumn('imp', udf(lambda ts: sum([_ for _ in ts if _]), IntegerType())(df.ts))
+ df = df.withColumn('imp', udf(lambda ts: sum(
+ [_ for _ in ts if _]), IntegerType())(df.ts))
# add popularity = mean
- df = df.withColumn('p', udf(lambda ts: sum([_ for _ in ts if _])/(1.0 * len(ts)), FloatType())(df.ts))
+ df = df.withColumn('p', udf(lambda ts: sum(
+ [_ for _ in ts if _])/(1.0 * len(ts)), FloatType())(df.ts))
# add normalized popularity = mean_n
df, _ = transform.normalize_ohe_feature(df, ohe_feature='p')
- df = df.filter(udf(lambda p_n, ts: not is_spare(datapoints_th_clusters, -sys.maxsize - 1)(p_n, ts), BooleanType())(df.p_n, df.ts))
+ df = df.filter(udf(lambda p_n, ts: not is_spare(
+ datapoints_th_clusters, -sys.maxsize - 1)(p_n, ts), BooleanType())(df.p_n, df.ts))
# denoising uckeys: remove some datapoints of the uckey. keep the data between upper and lower bound
df = denoise(df, percentile)
+ # drop tmp columns
+ df = df.drop('si_imp_total')
+
__save_as_table(df, output_table_name, hive_context, True)
diff --git a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_filter_si_region_bucket.py b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_filter_si_region_bucket.py
index 0c38793..4ba384f 100644
--- a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_filter_si_region_bucket.py
+++ b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_filter_si_region_bucket.py
@@ -56,42 +56,10 @@ def __save_as_table(df, table_name, hive_context, create_table):
def drop_residency(df):
- new_uckey = udf(lambda uckey: ','.join([v for i, v in enumerate(uckey.split(',')) if i != 6]))
- df = df.withColumn('_uckey', new_uckey(df.uckey)).drop('uckey').withColumnRenamed('_uckey', 'uckey')
- return df
-
-
-def modify_ipl(df, mapping):
- def __udf_method(_uckey_str):
- uckey = list(_uckey_str.split(','))
- uckey_ipl = uckey[-1]
- if uckey_ipl in mapping:
- uckey_ipl = mapping[uckey_ipl]
- uckey[-1] = uckey_ipl
- uckey_new = ','.join(uckey)
- return uckey_new
-
- new_uckey = udf(__udf_method, StringType())
- df = df.withColumn('uckey', new_uckey(df.uckey))
- df = df.drop('virtual').drop('original')
- return df
-
-
-def modify_residency(df, mapping_df):
- df = df.withColumn('original', split(df['uckey'], ',').getItem(6).cast(IntegerType()))
- df = df.join(mapping_df, on=['original'], how='left')
-
- def __udf_method(_uckey_str, _r, _virtual_region):
- uckey = list(_uckey_str.split(','))
- uckey_residency = uckey[:-2]
- uckey_ipl = uckey[-1]
- new_residency = _virtual_region
- uckey_new = ','.join(uckey_residency + ['' if new_residency is None else new_residency] + [uckey_ipl])
- return uckey_new
-
- new_uckey = udf(__udf_method, StringType())
- df = df.withColumn('uckey', new_uckey(df.uckey, df.original, df.virtual))
- df = df.drop('virtual').drop('original')
+ new_uckey = udf(lambda uckey: ','.join(
+ [v if i != 6 else '' for i, v in enumerate(uckey.split(','))]))
+ df = df.withColumn('_uckey', new_uckey(df.uckey)).drop(
+ 'uckey').withColumnRenamed('_uckey', 'uckey')
return df
@@ -105,16 +73,7 @@ def assign_new_bucket_id(df, n):
return df
-def run(hive_context, conditions, factdata_table_name, output_table_name, region_mapping_table, init_start_bucket, bucket_size, bucket_step, new_bucket_size, new_si_set):
-
- # ts will be counts from yesterday-(past_days) to yesterday
- mapping_df = hive_context.sql('SELECT old AS original, new AS virtual FROM {}'.format(region_mapping_table))
- mapping_list = mapping_df.collect()
- mapping = {}
- for row in mapping_list:
- original = row['original']
- virtual = row['virtual']
- mapping[original] = virtual
+def run(hive_context, conditions, factdata_table_name, output_table_name, init_start_bucket, bucket_size, bucket_step, new_bucket_size, new_si_set):
start_bucket = init_start_bucket
first_round = True
@@ -142,8 +101,7 @@ def run(hive_context, conditions, factdata_table_name, output_table_name, region
_udf = udf(lambda x: x.split(',')[1] in new_si_set, BooleanType())
df = df.filter(_udf(df.uckey))
- df = modify_ipl(df, mapping)
- # df = modify_residency(df, mapping_df)
+ df = drop_residency(df)
df = assign_new_bucket_id(df, new_bucket_size)
@@ -179,7 +137,6 @@ if __name__ == "__main__":
factdata_table_name = cfg['factdata_table_name']
output_table_name = cfg_filter['output_table_name']
- region_mapping_table = cfg_filter['region_mapping_table']
init_start_bucket = cfg_filter['init_start_bucket']
bucket_size = cfg_filter['bucket_size']
bucket_step = cfg_filter['bucket_step']
@@ -190,7 +147,7 @@ if __name__ == "__main__":
new_si_set = set(new_si_list)
run(hive_context=hive_context, conditions=conditions, factdata_table_name=factdata_table_name,
- output_table_name=output_table_name, region_mapping_table=region_mapping_table, init_start_bucket=init_start_bucket,
+ output_table_name=output_table_name, init_start_bucket=init_start_bucket,
bucket_size=bucket_size, bucket_step=bucket_step, new_bucket_size=new_bucket_size, new_si_set=new_si_set)
sc.stop()
diff --git a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_norm.py b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_norm.py
index 4e2fb24..9d40aa8 100644
--- a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_norm.py
+++ b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_norm.py
@@ -5,7 +5,7 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-
+
# http://www.apache.org/licenses/LICENSE-2.0.html
# Unless required by applicable law or agreed to in writing, software
@@ -94,16 +94,14 @@ def run(sc, hive_context, columns, input_table_name, output_table_name, yesterda
model_info['holidays_norm'] = holidays_norm
model_stats['holiday_stats'] = [hol_avg, hol_std]
- # Read factdata table
command = """
- SELECT uckey, ts, price_cat, a, g, t, si, r, p, ipl FROM {}
+ SELECT uckey, ts, price_cat, a, g, t, si, p FROM {}
""".format(input_table_name)
# DataFrame[uckey: string, price_cat: string, ts: array<int>, a: string, g: string, t: string, si: string, r: string, ipl: string]
df = hive_context.sql(command)
si_list = []
- r_list = []
ipl_list = []
removed_columns = []
for feature_name, feature_value_list in columns.items():
@@ -119,8 +117,6 @@ def run(sc, hive_context, columns, input_table_name, output_table_name, yesterda
ohe_feature_n = ohe_feature + '_n'
if feature_name == 'si':
si_list.append(ohe_feature_n)
- if feature_name == 'r':
- r_list.append(ohe_feature_n)
if feature_name == 'ipl':
ipl_list.append(ohe_feature_n)
df, stats = transform.normalize_ohe_feature(
@@ -136,11 +132,6 @@ def run(sc, hive_context, columns, input_table_name, output_table_name, yesterda
lambda *x: [_ for _ in x], ArrayType(FloatType()))(*si_list))
df = df.drop(*si_list)
- if len(r_list):
- df = df.withColumn('r_vec_n', udf(
- lambda *x: [_ for _ in x], ArrayType(FloatType()))(*r_list))
- df = df.drop(*r_list)
-
if len(ipl_list):
df = df.withColumn('ipl_vec_n', udf(
lambda *x: [_ for _ in x], ArrayType(FloatType()))(*ipl_list))
diff --git a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_ts.py b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_ts.py
index 4e5d3e7..67ac681 100644
--- a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_ts.py
+++ b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_ts.py
@@ -47,11 +47,11 @@ def __save_as_table(df, table_name, hive_context, create_table):
uckey string,
price_cat string,
ts array<int>,
+ ts_ver array<int>,
a string,
g string,
t string,
si string,
- r string,
ipl string
)
""".format(table_name)
@@ -61,11 +61,11 @@ def __save_as_table(df, table_name, hive_context, create_table):
df.select('uckey',
'price_cat',
'ts',
+ 'ts_ver',
'a',
'g',
't',
'si',
- 'r',
'ipl'
).write.format('hive').option("header", "true").option("encoding", "UTF-8").mode('append').insertInto(table_name)
@@ -76,7 +76,7 @@ def normalize(mlist):
return [0 if std == 0 else (item-avg)/(std) for item in mlist], avg, std
-def run(hive_context, conditions, factdata_table_name, yesterday, past_days, output_table_name, bucket_size, bucket_step):
+def run(hive_context, conditions, input_table, yesterday, past_days, predict_window, output_table, bucket_size, bucket_step):
# ts will be counts from yesterday-(past_days) to yesterday
@@ -87,6 +87,13 @@ def run(hive_context, conditions, factdata_table_name, yesterday, past_days, out
day = day + timedelta(days=-1)
day_list.sort()
+ day = datetime.strptime(yesterday, '%Y-%m-%d')
+ ver_day_list = []
+ for _ in range(0, predict_window):
+ day = day + timedelta(days=1)
+ ver_day_list.append(datetime.strftime(day, '%Y-%m-%d'))
+ ver_day_list.sort()
+
start_bucket = 0
first_round = True
while True:
@@ -99,7 +106,7 @@ def run(hive_context, conditions, factdata_table_name, yesterday, past_days, out
# Read factdata table
command = """
SELECT count_array, day, hour, uckey FROM {} WHERE bucket_id BETWEEN {} AND {}
- """.format(factdata_table_name, str(start_bucket), str(end_bucket))
+ """.format(input_table, str(start_bucket), str(end_bucket))
if len(conditions) > 0:
command = command + " AND {}".format(' AND '.join(conditions))
@@ -130,20 +137,21 @@ def run(hive_context, conditions, factdata_table_name, yesterday, past_days, out
# [Row(uckey='native,l03493p0r3,4G,g_m,4,CPM,23,78', price_cat='1', ts_list_map=[{'2019-11-02': '13'}])]
# This method handles missing dates by injecting nan
- df = transform.calculate_time_series(df, day_list)
+ df = transform.calculate_time_series(df, 'ts', day_list)
# [Row(uckey='native,l03493p0r3,4G,g_m,4,CPM,23,78', price_cat='1', ts_list_map=[{'2019-11-02': '13'}], ts=[nan, nan, nan, nan, nan, nan, nan, nan, nan, 2.6390573978424072])]
+ df = transform.calculate_time_series(df, 'ts_ver', ver_day_list)
+
# Log processor code to know the index of features
# v = concat_ws(UCDoc.uckey_delimiter, df.adv_type 0 , df.slot_id 1 , df.net_type 2 , df.gender 3 , df.age 4 ,
# df.price_dev 5 , df.pricing_type 6 , df.residence_city 7 , df.ip_city_code 8 )
df = df.withColumn('a', transform.add_feature_udf(4)(df.uckey))
df = df.withColumn('si', transform.add_feature_udf(1)(df.uckey))
- df = df.withColumn('r', transform.add_feature_udf(7)(df.uckey))
- df = df.withColumn('ipl', transform.add_feature_udf(8)(df.uckey))
+ df = df.withColumn('ipl', transform.add_feature_udf(7)(df.uckey))
df = df.withColumn('t', transform.add_feature_udf(2)(df.uckey))
df = df.withColumn('g', transform.add_feature_udf(3)(df.uckey))
- __save_as_table(df, output_table_name, hive_context, first_round)
+ __save_as_table(df, output_table, hive_context, first_round)
first_round = False
@@ -159,21 +167,22 @@ if __name__ == "__main__":
resolve_placeholder(cfg)
cfg_log = cfg['log']
- cfg = cfg['pipeline']['time_series']
+ cfg_pipeline = cfg['pipeline']['time_series']
sc = SparkContext()
hive_context = HiveContext(sc)
sc.setLogLevel(cfg_log['level'])
- yesterday = cfg['yesterday']
- prepare_past_days = cfg['prepare_past_days']
- output_table_name = cfg['output_table_name']
- bucket_size = cfg['bucket_size']
- bucket_step = cfg['bucket_step']
- input_table_name = cfg['input_table_name']
- conditions = cfg['conditions']
-
- run(hive_context, conditions, input_table_name,
- yesterday, prepare_past_days, output_table_name, bucket_size, bucket_step)
+ yesterday = cfg_pipeline['yesterday']
+ prepare_past_days = cfg_pipeline['prepare_past_days']
+ output_table = cfg_pipeline['output_table_name']
+ bucket_size = cfg_pipeline['bucket_size']
+ bucket_step = cfg_pipeline['bucket_step']
+ input_table = cfg_pipeline['input_table_name']
+ conditions = cfg_pipeline['conditions']
+ predict_window = cfg['trainer']['predict_window']
+
+ run(hive_context=hive_context, conditions=conditions, input_table=input_table,
+ yesterday=yesterday, past_days=prepare_past_days, predict_window=predict_window, output_table=output_table, bucket_size=bucket_size, bucket_step=bucket_step)
sc.stop()
diff --git a/Model/predictor-dl-model/predictor_dl_model/pipeline/transform.py b/Model/predictor-dl-model/predictor_dl_model/pipeline/transform.py
index 9431497..e433c01 100644
--- a/Model/predictor-dl-model/predictor_dl_model/pipeline/transform.py
+++ b/Model/predictor-dl-model/predictor_dl_model/pipeline/transform.py
@@ -39,7 +39,7 @@ def add_count_map(df):
# This method replace zeros with nan and inject nans for non existing days
-def calculate_time_series(df, day_list):
+def calculate_time_series(df, col_name, day_list):
def _helper(ts_list_map):
ts_map = {}
result = []
@@ -61,7 +61,7 @@ def calculate_time_series(df, day_list):
return result
_udf = udf(_helper, ArrayType(IntegerType()))
- df = df.withColumn('ts', _udf(df.ts_list_map))
+ df = df.withColumn(col_name, _udf(df.ts_list_map))
return df
diff --git a/Model/predictor-dl-model/predictor_dl_model/run.sh b/Model/predictor-dl-model/predictor_dl_model/run.sh
index 61dfffd..5005517 100644
--- a/Model/predictor-dl-model/predictor_dl_model/run.sh
+++ b/Model/predictor-dl-model/predictor_dl_model/run.sh
@@ -1,10 +1,7 @@
#!/bin/bash
-if true
+if false
then
- # simple call
- # spark-submit pipeline/show_config.py config.yml
-
spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G pipeline/show_config.py config.yml
fi
@@ -12,27 +9,18 @@ fi
#This part might be optional if uckeys have stable slot-id with region data
if false
then
- # simple call
- # spark-submit pipeline/main_filter_si_region_bucket.py config.yml
-
spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G pipeline/main_filter_si_region_bucket.py config.yml
fi
#Preparing ts data and save the results as <config.pipeline.time_series.ts_tmp_table_name>
if false
-then
- # simple call
- # spark-submit pipeline/main_ts.py config.yml
-
+then
spark-submit --master yarn --py-files pipeline/transform.py --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G pipeline/main_ts.py config.yml
fi
#Run outlier filter and save the results as <config.pipeline.time_series.{product_tag}_{pipeline_tag}_tmp_outlier>
if false
-then
- # simple call
- # spark-submit pipeline/main_outlier.py config.yml
-
+then
spark-submit --master yarn --py-files pipeline/transform.py --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G pipeline/main_outlier.py config.yml
fi
@@ -58,7 +46,6 @@ fi
#Preparing normalization
if false
then
- #spark-submit pipeline/main_norm.py config.yml
spark-submit --master yarn --py-files pipeline/transform.py --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G pipeline/main_norm.py config.yml
fi
diff --git a/Model/predictor-dl-model/predictor_dl_model/trainer/client_rest_dl2.py b/Model/predictor-dl-model/predictor_dl_model/trainer/client_rest_dl2.py
index 47b1ae6..a2ce5f6 100644
--- a/Model/predictor-dl-model/predictor_dl_model/trainer/client_rest_dl2.py
+++ b/Model/predictor-dl-model/predictor_dl_model/trainer/client_rest_dl2.py
@@ -5,7 +5,7 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-
+
# http://www.apache.org/licenses/LICENSE-2.0.html
# Unless required by applicable law or agreed to in writing, software
@@ -28,16 +28,19 @@ import requests
import numpy as np
from typing import List
import pandas as pd
-import datetime, math
+import datetime
+import math
import sys
# from predictor_dl_model.pipeline.util import get_dow
+
def get_start_end(records_len, train_window, forward_offset):
start = records_len - train_window - forward_offset
end = records_len - forward_offset
return start, end
+
def get_dow(day_list):
dow_list = []
for day in day_list:
@@ -49,6 +52,7 @@ def get_dow(day_list):
cos_list = [math.cos(x / week_period) for x in dow_list]
return (sin_list, cos_list)
+
def lag_indexes(day_list):
"""
Calculates indexes for 3, 6, 9, 12 months backward lag for the given date range
@@ -65,9 +69,7 @@ def lag_indexes(day_list):
dates = date_range - offset
return pd.Series(data=base_index[dates].fillna(-1).astype(np.int64).values, index=date_range)
- return [lag(pd.DateOffset(months =m)) for m in (1,2)]
-
-
+ return [lag(pd.DateOffset(days=m)) for m in (30, 60)]
def make_pred_input(duration, train_window, predict_window, full_record_exp, x_hits, dow, lagged_ix, pf_age, pf_si,
@@ -109,11 +111,9 @@ def make_pred_input(duration, train_window, predict_window, full_record_exp, x_h
x_lagged, y_lagged = norm_lagged_hits[:
train_window], norm_lagged_hits[train_window:]
-
# Combine all page features into single tensor
stacked_features = np.stack([page_popularity, quarter_autocorr])
-
flat_ucdoc_features = np.concatenate([pf_age, pf_si, pf_network, pf_gender, pf_price_cat, stacked_features],
axis=0) # pf_region
ucdoc_features = np.expand_dims(flat_ucdoc_features, 0)
@@ -143,10 +143,14 @@ def make_pred_input(duration, train_window, predict_window, full_record_exp, x_h
def get_predict_post_body(model_stats, day_list, day_list_cut, page_ix, pf_age, pf_si, pf_network, pf_gender,
full_record, hits, pf_price_cat, predict_day_list, forward_offset):
+
train_window = model_stats['model']['train_window'] # comes from cfg
predict_window = model_stats['model']['predict_window'] # comes from cfg
x_hits = np.log(np.add(hits, 1)).tolist() # ln + 1
full_record_exp = np.log(np.add(full_record, 1)).tolist()
+
+ # TODO: Added
+ duration = len(full_record_exp)
if len(day_list_cut) != train_window + predict_window:
raise Exception('day_list_cut and train window + predicti_window do not match. {} {} {}'.format(
@@ -162,12 +166,10 @@ def get_predict_post_body(model_stats, day_list, day_list_cut, page_ix, pf_age,
# not used in the model (but we should keep it)
page_popularity = np.mean(full_record)
page_popularity = (
- page_popularity - model_stats['stats']['page_popularity'][0]) / \
- model_stats['stats']['page_popularity'][1]
+ page_popularity - model_stats['stats']['page_popularity'][0]) / \
+ model_stats['stats']['page_popularity'][1]
quarter_autocorr = 1
- duration = model_stats['model']['duration']
-
# x_hits, x_features, norm_x_hits, x_lagged, y_features, mean, std, flat_ucdoc_features, page_ix
truex, timex, normx, laggedx, timey, normmean, normstd, pgfeatures, pageix = make_pred_input(duration,
train_window,
@@ -197,13 +199,13 @@ def get_predict_post_body(model_stats, day_list, day_list_cut, page_ix, pf_age,
def predict(serving_url, model_stats, day_list, ucdoc_attribute_map, forward_offset):
page_ix = ucdoc_attribute_map['page_ix']
- pf_age = [ucdoc_attribute_map['a__n'], ucdoc_attribute_map['a_1_n'],ucdoc_attribute_map['a_2_n'], ucdoc_attribute_map['a_3_n'],
- ucdoc_attribute_map['a_4_n'], ucdoc_attribute_map['a_5_n'], ucdoc_attribute_map['a_6_n']]
+ pf_age = [ucdoc_attribute_map['a__n'], ucdoc_attribute_map['a_1_n'], ucdoc_attribute_map['a_2_n'], ucdoc_attribute_map['a_3_n'],
+ ucdoc_attribute_map['a_4_n'], ucdoc_attribute_map['a_5_n'], ucdoc_attribute_map['a_6_n']]
pf_si = ucdoc_attribute_map['si_vec_n']
- pf_network = [ ucdoc_attribute_map['t_2G_n'], ucdoc_attribute_map['t_3G_n'],ucdoc_attribute_map['t_4G_n'],
- ucdoc_attribute_map['t_UNKNOWN_n'],ucdoc_attribute_map['t_WIFI_n'], ]
+ pf_network = [ucdoc_attribute_map['t_2G_n'], ucdoc_attribute_map['t_3G_n'], ucdoc_attribute_map['t_4G_n'],
+ ucdoc_attribute_map['t_UNKNOWN_n'], ucdoc_attribute_map['t_WIFI_n'], ]
pf_gender = [ucdoc_attribute_map['g__n'], ucdoc_attribute_map['g_g_f_n'], ucdoc_attribute_map['g_g_m_n'], ucdoc_attribute_map['g_g_x_n']]
- pf_price_cat= [ucdoc_attribute_map['price_cat_1_n'], ucdoc_attribute_map['price_cat_2_n'],ucdoc_attribute_map['price_cat_3_n']]
+ pf_price_cat = [ucdoc_attribute_map['price_cat_1_n'], ucdoc_attribute_map['price_cat_2_n'], ucdoc_attribute_map['price_cat_3_n']]
full_record = np.expm1(ucdoc_attribute_map['ts_n'])
# days_list is sorted from past to present [2018-01-01, 2018-01-02, ...]
@@ -218,7 +220,7 @@ def predict(serving_url, model_stats, day_list, ucdoc_attribute_map, forward_off
day_list_cut = day_list[start:end]
predict_day = [datetime.datetime.strptime(day_list_cut[-1], "%Y-%m-%d").date() + datetime.timedelta(days=x + 1) for
x in range(prediction_window)]
- predict_day_list= [x.strftime("%Y-%m-%d") for x in predict_day]
+ predict_day_list = [x.strftime("%Y-%m-%d") for x in predict_day]
day_list_cut.extend(predict_day_list)
body = {"instances": []}
@@ -235,83 +237,82 @@ def predict(serving_url, model_stats, day_list, ucdoc_attribute_map, forward_off
return prediction_results, predict_day_list
-
if __name__ == '__main__': # record is equal to window size
URL = "http://10.193.217.105:8508/v1/models/dl_20210706:predict"
model_stats = {
"model": {"days": ["2020-03-01", "2020-03-02", "2020-03-03", "2020-03-04", "2020-03-05", "2020-03-06", "2020-03-07",
- "2020-03-08", "2020-03-09", "2020-03-10", "2020-03-11", "2020-03-12", "2020-03-13", "2020-03-14",
- "2020-03-15", "2020-03-16", "2020-03-17", "2020-03-18", "2020-03-19", "2020-03-20", "2020-03-21",
- "2020-03-22", "2020-03-23", "2020-03-24", "2020-03-25", "2020-03-26", "2020-03-27", "2020-03-28",
- "2020-03-29", "2020-03-30", "2020-03-31", "2020-04-01", "2020-04-02", "2020-04-03", "2020-04-04",
- "2020-04-05", "2020-04-06", "2020-04-07", "2020-04-08", "2020-04-09", "2020-04-10", "2020-04-11",
- "2020-04-12", "2020-04-13", "2020-04-14", "2020-04-15", "2020-04-16", "2020-04-17", "2020-04-18",
- "2020-04-19", "2020-04-20", "2020-04-21", "2020-04-22", "2020-04-23", "2020-04-24", "2020-04-25",
- "2020-04-26", "2020-04-27", "2020-04-28", "2020-04-29", "2020-04-30", "2020-05-01", "2020-05-02",
- "2020-05-03", "2020-05-04", "2020-05-05", "2020-05-06", "2020-05-07", "2020-05-08", "2020-05-09",
- "2020-05-10", "2020-05-11", "2020-05-12", "2020-05-13", "2020-05-14", "2020-05-15", "2020-05-16",
- "2020-05-17", "2020-05-18", "2020-05-19", "2020-05-20", "2020-05-21", "2020-05-22", "2020-05-23",
- "2020-05-24", "2020-05-25", "2020-05-26", "2020-05-27", "2020-05-28", "2020-05-29", "2020-05-30",
- "2020-05-31", "2020-06-01", "2020-06-02", "2020-06-03", "2020-06-04", "2020-06-05", "2020-06-06",
- "2020-06-07", "2020-06-08", "2020-06-09", "2020-06-10", "2020-06-11", "2020-06-12", "2020-06-13",
- "2020-06-14", "2020-06-15", "2020-06-16", "2020-06-17", "2020-06-18", "2020-06-19", "2020-06-20",
- "2020-06-21", "2020-06-22", "2020-06-23", "2020-06-24", "2020-06-25", "2020-06-26", "2020-06-27",
- "2020-06-28", "2020-06-29", "2020-06-30"], "duration": 112,
- "holidays_norm": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
- "name": "model_dlpm_06242021_1635", "predict_window": 10, "train_window": 60, "version": "version_06242021_1635"},
- "stats": {"a_":[0.04429662466026579,0.20533941439740103],"a_1":[0.0594024062498474,0.23536391091868636],
- "a_2":[0.13875638124033587,0.3443551863453624],"a_3":[0.18247821941727763,0.3847745538180887],
- "a_4":[0.23540840480301564,0.4227143144991038],"a_5":[0.20678420677160178,0.40351830940626193],
- "a_6":[0.1328737568768233,0.338151429839641],"g_":[0.03802539182225981,0.1910382838856348],
- "g_g_f":[0.31290878405353795,0.46186909160791834],"g_g_m":[0.6370381966764169,0.4790592824120438],
- "g_g_x":[0.012027627461027505,0.10846109268711399],
- "holiday_stats":[0.0,0.0],"ipl_":[1.0,1.0],"ipl_1":[0.0,1.0],"ipl_10":[0.0,1.0],"ipl_11":[0.0,1.0],"ipl_12":[0.0,1.0],"ipl_13":[0.0,1.0],"ipl_14":[0.0,1.0],"ipl_15":[0.0,1.0],"ipl_16":[0.0,1.0],"ipl_17":[0.0,1.0],"ipl_18":[0.0,1.0],"ipl_19":[0.0,1.0],"ipl_2":[0.0,1.0],"ipl_20":[0.0,1.0],"ipl_21":[0.0,1.0],"ipl_22":[0.0,1.0],"ipl_23":[0.0,1.0],"ipl_24":[0.0,1.0],"ipl_25":[0.0,1.0],"ipl_26":[0.0,1.0],"ipl_27":[0.0,1.0],"ipl_28":[0.0,1.0],"ipl_29":[0.0,1.0],"ipl_3":[0.0,1. [...]
- "page_popularity":[577.1606531140158,5127.251109603019],
- "price_cat_1":[0.6813400284793525,0.4659575570248394],"price_cat_2":[0.1969572060256314,0.3977003132674967],
- "price_cat_3":[0.12170276549501612,0.3269426904032709],"r_":[0.06428884716597416,0.2445368641073571],"r_1":[0.017636910330801243,0.13112898948090662],"r_10":[0.007311564935032408,0.08479574334903354],"r_11":[0.0123057065964087,0.10979549743303678],"r_12":[0.010682300938137547,0.10238183510308607],"r_13":[0.012839921663653402,0.11211387396512836],"r_14":[0.0043072282145927634,0.0652208481532438],"r_15":[0.005510994725533859,0.07372435219261837],"r_16":[0.0094308956999531 [...]
- "si_15e9ddce941b11e5bdec00163e291137":[0.0021734242674061304,0.046569378305571674],
- "si_17dd6d8098bf11e5bdec00163e291137":[8.064153488720677E-4,0.028386043378890852],
- "si_5cd1c663263511e6af7500163e291137":[0.024480251817432363,0.15453491734752325],"si_66bcd2720e5011e79bc8fa163e05184e":[0.0747118339204077,0.2629261931283321],"si_68bcd2720e5011e79bc8fa163e05184e":[0.00832196657423368,0.09084457158709529],"si_71bcd2720e5011e79bc8fa163e05184e":[0.0031866896500037474,0.056360839080046306],"si_7b0d7b55ab0c11e68b7900163e3e481d":[0.07978115865997153,0.270954508123256],"si_a290af82884e11e5bdec00163e291137":[0.08873866446826051,0.2843665879001 [...]
+ "2020-03-08", "2020-03-09", "2020-03-10", "2020-03-11", "2020-03-12", "2020-03-13", "2020-03-14",
+ "2020-03-15", "2020-03-16", "2020-03-17", "2020-03-18", "2020-03-19", "2020-03-20", "2020-03-21",
+ "2020-03-22", "2020-03-23", "2020-03-24", "2020-03-25", "2020-03-26", "2020-03-27", "2020-03-28",
+ "2020-03-29", "2020-03-30", "2020-03-31", "2020-04-01", "2020-04-02", "2020-04-03", "2020-04-04",
+ "2020-04-05", "2020-04-06", "2020-04-07", "2020-04-08", "2020-04-09", "2020-04-10", "2020-04-11",
+ "2020-04-12", "2020-04-13", "2020-04-14", "2020-04-15", "2020-04-16", "2020-04-17", "2020-04-18",
+ "2020-04-19", "2020-04-20", "2020-04-21", "2020-04-22", "2020-04-23", "2020-04-24", "2020-04-25",
+ "2020-04-26", "2020-04-27", "2020-04-28", "2020-04-29", "2020-04-30", "2020-05-01", "2020-05-02",
+ "2020-05-03", "2020-05-04", "2020-05-05", "2020-05-06", "2020-05-07", "2020-05-08", "2020-05-09",
+ "2020-05-10", "2020-05-11", "2020-05-12", "2020-05-13", "2020-05-14", "2020-05-15", "2020-05-16",
+ "2020-05-17", "2020-05-18", "2020-05-19", "2020-05-20", "2020-05-21", "2020-05-22", "2020-05-23",
+ "2020-05-24", "2020-05-25", "2020-05-26", "2020-05-27", "2020-05-28", "2020-05-29", "2020-05-30",
+ "2020-05-31", "2020-06-01", "2020-06-02", "2020-06-03", "2020-06-04", "2020-06-05", "2020-06-06",
+ "2020-06-07", "2020-06-08", "2020-06-09", "2020-06-10", "2020-06-11", "2020-06-12", "2020-06-13",
+ "2020-06-14", "2020-06-15", "2020-06-16", "2020-06-17", "2020-06-18", "2020-06-19", "2020-06-20",
+ "2020-06-21", "2020-06-22", "2020-06-23", "2020-06-24", "2020-06-25", "2020-06-26", "2020-06-27",
+ "2020-06-28", "2020-06-29", "2020-06-30"], "duration": 112,
+ "holidays_norm": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
+ "name": "model_dlpm_06242021_1635", "predict_window": 10, "train_window": 60, "version": "version_06242021_1635"},
+ "stats": {"a_": [0.04429662466026579, 0.20533941439740103], "a_1": [0.0594024062498474, 0.23536391091868636],
+ "a_2": [0.13875638124033587, 0.3443551863453624], "a_3": [0.18247821941727763, 0.3847745538180887],
+ "a_4": [0.23540840480301564, 0.4227143144991038], "a_5": [0.20678420677160178, 0.40351830940626193],
+ "a_6": [0.1328737568768233, 0.338151429839641], "g_": [0.03802539182225981, 0.1910382838856348],
+ "g_g_f": [0.31290878405353795, 0.46186909160791834], "g_g_m": [0.6370381966764169, 0.4790592824120438],
+ "g_g_x": [0.012027627461027505, 0.10846109268711399],
+ "holiday_stats": [0.0, 0.0], "ipl_": [1.0, 1.0], "ipl_1": [0.0, 1.0], "ipl_10": [0.0, 1.0], "ipl_11": [0.0, 1.0], "ipl_12": [0.0, 1.0], "ipl_13": [0.0, 1.0], "ipl_14": [0.0, 1.0], "ipl_15": [0.0, 1.0], "ipl_16": [0.0, 1.0], "ipl_17": [0.0, 1.0], "ipl_18": [0.0, 1.0], "ipl_19": [0.0, 1.0], "ipl_2": [0.0, 1.0], "ipl_20": [0.0, 1.0], "ipl_21": [0.0, 1.0], "ipl_22": [0.0, 1.0], "ipl_23": [0.0, 1.0], "ipl_24": [0.0, 1.0], "ipl_25": [0.0, 1.0], "ipl_26": [0.0, 1.0], "ipl_27": [...]
+ "page_popularity": [577.1606531140158, 5127.251109603019],
+ "price_cat_1": [0.6813400284793525, 0.4659575570248394], "price_cat_2": [0.1969572060256314, 0.3977003132674967],
+ "price_cat_3": [0.12170276549501612, 0.3269426904032709], "r_": [0.06428884716597416, 0.2445368641073571], "r_1": [0.017636910330801243, 0.13112898948090662], "r_10": [0.007311564935032408, 0.08479574334903354], "r_11": [0.0123057065964087, 0.10979549743303678], "r_12": [0.010682300938137547, 0.10238183510308607], "r_13": [0.012839921663653402, 0.11211387396512836], "r_14": [0.0043072282145927634, 0.0652208481532438], "r_15": [0.005510994725533859, 0.07372435219261837], [...]
+ "si_15e9ddce941b11e5bdec00163e291137": [0.0021734242674061304, 0.046569378305571674],
+ "si_17dd6d8098bf11e5bdec00163e291137": [8.064153488720677E-4, 0.028386043378890852],
+ "si_5cd1c663263511e6af7500163e291137": [0.024480251817432363, 0.15453491734752325], "si_66bcd2720e5011e79bc8fa163e05184e": [0.0747118339204077, 0.2629261931283321], "si_68bcd2720e5011e79bc8fa163e05184e": [0.00832196657423368, 0.09084457158709529], "si_71bcd2720e5011e79bc8fa163e05184e": [0.0031866896500037474, 0.056360839080046306], "si_7b0d7b55ab0c11e68b7900163e3e481d": [0.07978115865997153, 0.270954508123256], "si_a290af82884e11e5bdec00163e291137": [0.08873866446826051 [...]
}
days = model_stats['model']['days']
x = [
4.919981, 4.912655, 4.912655, 5.1119876, 4.5217886, 5.638355, 5.673323, 5.874931, 5.6801724, 5.4026775,
- 6.2422233, 6.150603, 5.918894, 6.251904, 6.0497336, 6.075346, 5.9939613, 6.222576, 6.1944056, 6.1025586,
- 6.3261495, 5.811141, 5.3752785, 5.6903596, 5.4722705, 5.497168, 5.1357985, 4.919981, 5.117994, 4.9904327,
- 5.0998664, 5.159055, 5.2983174, 4.7004805, 0.0, 4.1431346, 4.4998097, 4.6151204, 4.6728287, 4.356709,
- 5.0238805, 5.749393, 5.4467373, 5.433722, 5.4764633, 4.9767337, 5.2832036, 5.996452, 6.011267, 6.089045,
- 5.8318825, 5.556828, 5.590987, 5.755742, 5.8406415, 5.7525725, 5.733341, 4.4188404, 0.0, 4.454347, 3.988984,
- 4.26268, 4.2904596, 4.804021, 4.4308167, 0.0, 6.0684257, 6.423247, 6.6795993, 6.6463904, 6.418365, 6.25575,
- 6.1984787, 6.2841344, 6.350886, 6.4361506, 6.3733196, 6.668228, 6.2186003, 5.8230457, 6.2461066, 5.9839363,
- 6.424869, 6.7730803, 6.510258, 6.591674, 6.767343, 6.666957, 5.529429, 5.075174, 4.248495, 0.0, 4.0943446,
- 6.685861, 6.8330317, 6.7214255, 6.5971456, 6.629363, 6.72263, 6.5117455, 6.6795993, 6.8211074, 7.004882,
- 6.9920964, 6.9641356, 6.991177, 6.6554403, 6.364751, 6.50129, 6.6346335, 6.43294, 6.9353704, 7.0030656,
- 6.7968235, 7.2174435, 7.183871, 7.3225102, 7.010312, 7.4506607, 6.8855095, 6.163315, 0.0]
+ 6.2422233, 6.150603, 5.918894, 6.251904, 6.0497336, 6.075346, 5.9939613, 6.222576, 6.1944056, 6.1025586,
+ 6.3261495, 5.811141, 5.3752785, 5.6903596, 5.4722705, 5.497168, 5.1357985, 4.919981, 5.117994, 4.9904327,
+ 5.0998664, 5.159055, 5.2983174, 4.7004805, 0.0, 4.1431346, 4.4998097, 4.6151204, 4.6728287, 4.356709,
+ 5.0238805, 5.749393, 5.4467373, 5.433722, 5.4764633, 4.9767337, 5.2832036, 5.996452, 6.011267, 6.089045,
+ 5.8318825, 5.556828, 5.590987, 5.755742, 5.8406415, 5.7525725, 5.733341, 4.4188404, 0.0, 4.454347, 3.988984,
+ 4.26268, 4.2904596, 4.804021, 4.4308167, 0.0, 6.0684257, 6.423247, 6.6795993, 6.6463904, 6.418365, 6.25575,
+ 6.1984787, 6.2841344, 6.350886, 6.4361506, 6.3733196, 6.668228, 6.2186003, 5.8230457, 6.2461066, 5.9839363,
+ 6.424869, 6.7730803, 6.510258, 6.591674, 6.767343, 6.666957, 5.529429, 5.075174, 4.248495, 0.0, 4.0943446,
+ 6.685861, 6.8330317, 6.7214255, 6.5971456, 6.629363, 6.72263, 6.5117455, 6.6795993, 6.8211074, 7.004882,
+ 6.9920964, 6.9641356, 6.991177, 6.6554403, 6.364751, 6.50129, 6.6346335, 6.43294, 6.9353704, 7.0030656,
+ 6.7968235, 7.2174435, 7.183871, 7.3225102, 7.010312, 7.4506607, 6.8855095, 6.163315, 0.0]
days = days[:-10]
x = x[:-10]
si = [-0.04667067527770996, -0.2841551601886749, -0.29444485902786255, -0.06350809335708618, -0.1994006484746933,
- -0.11050120741128922, -0.028408868238329887, -0.15841242671012878, -0.09160664677619934,
- -0.056540846824645996, -0.3120572865009308, -0.5284554362297058, -0.32392826676368713, -0.06355565786361694,
- -0.13475105166435242, 6.289364814758301, -0.21588164567947388, -0.23473970592021942, -0.06135460361838341,
- -0.06582210212945938, -0.15293772518634796, -0.1407172530889511, -0.09954438358545303, -0.05613924190402031,
- -0.15412424504756927, -0.3370814323425293, -0.1369824856519699]
+ -0.11050120741128922, -0.028408868238329887, -0.15841242671012878, -0.09160664677619934,
+ -0.056540846824645996, -0.3120572865009308, -0.5284554362297058, -0.32392826676368713, -0.06355565786361694,
+ -0.13475105166435242, 6.289364814758301, -0.21588164567947388, -0.23473970592021942, -0.06135460361838341,
+ -0.06582210212945938, -0.15293772518634796, -0.1407172530889511, -0.09954438358545303, -0.05613924190402031,
+ -0.15412424504756927, -0.3370814323425293, -0.1369824856519699]
forward_offset = 0
- response = predict(serving_url = URL, model_stats=model_stats, day_list=days, ucdoc_attribute_map = {'uckey': 'magazinelock,1,3G,g_f,2,pt,1004,icc',
- 'ts': [], 'price_cat': '1',
- 'p':462.7049255371094 , 'a__n': 4.654261589050293, 'a_1_n': -0.25238537788391113, 'a_2_n':-0.40294551849365234,
- 'a_3_n': -0.4742470979690552, 'a_4_n': -0.5568971633911133, 'a_5_n':-0.5124530792236328, 'a_6_n':-0.39294159412384033,
- 't_UNKNOWN_n': -0.0056058494374156, 't_3G_n': -0.09904143214225769, 't_4G_n': -0.853390634059906, 't_WIFI_n': 0.8717950582504272,
- 't_2G_n':-0.02846473827958107, 'g__n': 5.035506725311279,
- 'g_g_f_n': -0.6774836778640747, 'g_g_m_n':-1.3297690153121948, 'g_g_x_n':-0.11089347302913666 ,
- 'price_cat_1_n':0.6838819980621338 , 'price_cat_2_n': -0.49524027, 'price_cat_3_n': -0.37224495,
- 'si_vec_n': si, 'r_vec_n': [], 'p_n': -0.02232302 , 'ts_n': x, 'page_ix':'native,d9jucwkpr3,WIFI,,,CPM,,60-1'}, forward_offset = forward_offset )
-
- print(response)
\ No newline at end of file
+ response = predict(serving_url=URL, model_stats=model_stats, day_list=days, ucdoc_attribute_map={'uckey': 'magazinelock,1,3G,g_f,2,pt,1004,icc',
+ 'ts': [], 'price_cat': '1',
+ 'p': 462.7049255371094, 'a__n': 4.654261589050293, 'a_1_n': -0.25238537788391113, 'a_2_n': -0.40294551849365234,
+ 'a_3_n': -0.4742470979690552, 'a_4_n': -0.5568971633911133, 'a_5_n': -0.5124530792236328, 'a_6_n': -0.39294159412384033,
+ 't_UNKNOWN_n': -0.0056058494374156, 't_3G_n': -0.09904143214225769, 't_4G_n': -0.853390634059906, 't_WIFI_n': 0.8717950582504272,
+ 't_2G_n': -0.02846473827958107, 'g__n': 5.035506725311279,
+ 'g_g_f_n': -0.6774836778640747, 'g_g_m_n': -1.3297690153121948, 'g_g_x_n': -0.11089347302913666,
+ 'price_cat_1_n': 0.6838819980621338, 'price_cat_2_n': -0.49524027, 'price_cat_3_n': -0.37224495,
+ 'si_vec_n': si, 'r_vec_n': [], 'p_n': -0.02232302, 'ts_n': x, 'page_ix': 'native,d9jucwkpr3,WIFI,,,CPM,,60-1'}, forward_offset=forward_offset)
+
+ print(response)
diff --git a/Model/predictor-dl-model/scripts/check_stable_si.py b/Model/predictor-dl-model/scripts/check_stable_si.py
new file mode 100644
index 0000000..dbfe32b
--- /dev/null
+++ b/Model/predictor-dl-model/scripts/check_stable_si.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+
+import pyspark.sql.functions as fn
+from pyspark import SparkContext
+from pyspark.sql import HiveContext
+
+''' Get the ts table and canculate the traffic in each day for each SI '''
+
+def run(hive_context, input_table_name):
+
+ # Read factdata table
+ command = """
+ SELECT si,ts FROM {}
+ """.format(input_table_name)
+
+ # DataFrame[ts: array<int>]
+ df = hive_context.sql(command)
+ columns = ['ts']
+ df_sizes = df.select(*[fn.size(col).alias(col) for col in columns])
+ df_max = df_sizes.agg(*[fn.max(col).alias(col) for col in columns])
+ max_dict = df_max.collect()[0].asDict()
+ df_result = df.select('si', *[df[col][i] for col in columns for i in range(max_dict[col])])
+ df_result = df_result.na.fill(value=0)
+ df_result.toPandas().to_csv('total_si')
+
+
+
+if __name__ == '__main__':
+ sc = SparkContext()
+ hive_context = HiveContext(sc)
+
+ run(hive_context=hive_context, input_table_name="dlpm_10052021_1400_tmp_ts")
\ No newline at end of file
diff --git a/Model/predictor-dl-model/scripts/import_factdata_files_1.py b/Model/predictor-dl-model/scripts/import_factdata_files_1.py
new file mode 100644
index 0000000..5617a4b
--- /dev/null
+++ b/Model/predictor-dl-model/scripts/import_factdata_files_1.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+'''
+This file process raw tables. Raw tables are created by import_factdata_files.py
+
+spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict import_factdata_files_1.py
+
+'''
+
+
+from pyspark.sql.types import IntegerType, ArrayType, StringType
+from pyspark import SparkContext
+from pyspark.sql import HiveContext
+from pyspark.sql.functions import when, regexp_replace, split, col, udf
+import hashlib
+
+
+def run(hive_context, input_table_name, factdata_table_name):
+
+ # select the data frame and process it
+ command = """SELECT * FROM {}""".format(input_table_name)
+ df = hive_context.sql(command)
+
+ df = df.withColumn("bucket_id", df["bucket_id"].cast(IntegerType()))
+ df = df.withColumn("hour", df["hour"].cast(IntegerType()))
+ df = df.withColumn("count_array", when(df.count_array.endswith("]"), regexp_replace(df.count_array, "\]", "")))
+ df = df.withColumn("count_array", when(df.count_array.startswith("["), regexp_replace(df.count_array, "\[", "")))
+ df = df.withColumn("count_array", regexp_replace(df.count_array, '\"', ''))
+ df = df.withColumn("count_array", split(col("count_array"), ",").cast(ArrayType(StringType())))
+ df = df.filter("count_array IS NOT NULL")
+
+ command = """CREATE TABLE IF NOT EXISTS {} (uckey STRING, count_array array<string>, hour INT, day STRING) PARTITIONED BY (bucket_id INT)""".format(factdata_table_name)
+ hive_context.sql(command)
+
+ # write the dataframe into the partitioned table
+ df.write.option("header", "true").option("encoding", "UTF-8").mode("append").format('hive').insertInto(factdata_table_name)
+
+
+if __name__ == "__main__":
+
+ sc = SparkContext.getOrCreate()
+ hive_context = HiveContext(sc)
+ factdata_table_name = 'factdata_10182021'
+ input_table_name = "adhoctemp_tmp_z00380608_20210731_factdata_dm_01"
+ run(hive_context=hive_context, input_table_name=input_table_name, factdata_table_name=factdata_table_name)
+ sc.stop()
diff --git a/Model/predictor-dl-model/tests/pipeline/test_base.py b/Model/predictor-dl-model/tests/pipeline/test_base.py
index 2b5ccf8..10430fd 100644
--- a/Model/predictor-dl-model/tests/pipeline/test_base.py
+++ b/Model/predictor-dl-model/tests/pipeline/test_base.py
@@ -5,7 +5,7 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-
+
# http://www.apache.org/licenses/LICENSE-2.0.html
# Unless required by applicable law or agreed to in writing, software
@@ -17,15 +17,19 @@
import unittest
import yaml
from pyspark import SparkContext
-from pyspark.sql import HiveContext
+from pyspark.sql import SparkSession, HiveContext
class TestBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
- with open('config.yml', 'r') as ymlfile:
- cfg = yaml.load(ymlfile)
+ try:
+ with open('config.yml', 'r') as ymlfile:
+ cfg = yaml.load(ymlfile)
+ except IOError:
+ cfg = {'test': {'timer': 10}}
+
cls.cfg = cfg
sc = SparkContext().getOrCreate()
sc.setLogLevel('warn')
diff --git a/Model/predictor-dl-model/troubleshooting/check_model.py b/Model/predictor-dl-model/troubleshooting/check_model.py
new file mode 100644
index 0000000..387e458
--- /dev/null
+++ b/Model/predictor-dl-model/troubleshooting/check_model.py
@@ -0,0 +1,288 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import math
+import pickle
+import statistics
+import yaml
+import argparse
+import re
+import hashlib
+
+import pyspark.sql.functions as fn
+import numpy as np
+
+from pyspark import SparkContext
+from pyspark.sql import SparkSession, HiveContext
+from pyspark.sql.types import IntegerType, StringType, MapType
+from datetime import datetime, timedelta
+
+'''
+
+This script performs the following actions:
+
+1. call model API with N number of randomly picked dense uckeys from trainready (The same data that is used to train the model).
+2. calculate the accuracy of the model.
+
+
+run by:
+spark-submit --master yarn --num-executors 5 --executor-cores 3 --executor-memory 16G --driver-memory 16G check_model.py
+
+'''
+
+from client_rest_dl2 import predict
+
+
+def c_error(x, y):
+ x = x * 1.0
+ if x != 0:
+ e = abs(x - y) / x
+ else:
+ e = -1
+ e = round(e, 3)
+ return e
+
+
+def error_m(a, p):
+ result = []
+ for i in range(len(a)):
+ x = a[i]
+ y = p[i]
+ e = c_error(x, y)
+ result.append(e)
+ x = sum(a)
+ y = sum(p)
+ e = c_error(x, y)
+ return (e, result)
+
+
+def normalize_ts(ts):
+ ts_n = [math.log(i + 1) for i in ts]
+ return ts_n
+
+
+def dl_daily_forecast(serving_url, model_stats, day_list, ucdoc_attribute_map):
+ x, y = predict(serving_url=serving_url, model_stats=model_stats,
+ day_list=day_list, ucdoc_attribute_map=ucdoc_attribute_map, forward_offset=0)
+ ts = x[0]
+ days = y
+ return ts, days
+
+
+def get_model_stats(hive_context, model_stat_table):
+ '''
+ return a dict
+ model_stats = {
+ "model": {
+ "name": "s32",
+ "version": 1,
+ "duration": 90,
+ "train_window": 60,
+ "predict_window": 10
+ },
+ "stats": {
+ "g_g_m": [
+ 0.32095959595959594,
+ 0.4668649491714752
+ ],
+ "g_g_f": [
+ 0.3654040404040404,
+ 0.4815635452904544
+ ],
+ "g_g_x": [
+ 0.31363636363636366,
+ 0.46398999646418304
+ ],
+ '''
+ command = """
+ SELECT * FROM {}
+ """.format(model_stat_table)
+ df = hive_context.sql(command)
+ rows = df.collect()
+ if len(rows) != 1:
+ raise Exception('Bad model stat table {} '.format(model_stat_table))
+ model_info = rows[0]['model_info']
+ model_stats = rows[0]['stats']
+ result = {
+ 'model': model_info,
+ 'stats': model_stats
+ }
+ return result
+
+
+def predict_daily_uckey(sample, days, serving_url, model_stats, columns):
+
+ def _denoise(ts):
+ non_zero_ts = [_ for _ in ts if _ != 0]
+ nonzero_p = 0.0
+ if len(non_zero_ts) > 0:
+ nonzero_p = 1.0 * sum(ts) / len(non_zero_ts)
+
+ return [i if i > (nonzero_p / 10.0) else 0 for i in ts]
+
+ def _helper(cols):
+ day_list = days[:]
+ ucdoc_attribute_map = {}
+ for feature in columns:
+ ucdoc_attribute_map[feature] = cols[feature]
+
+ # determine ts_n and days
+ model_input_ts = []
+
+ # -----------------------------------------------------------------------------------------------
+ '''
+ The following code is in dlpredictor, here ts has a different format
+
+ 'ts': [0, 0, 0, 0, 0, 65, 47, 10, 52, 58, 27, 55, 23, 44, 38, 42, 90, 26, 95, 34, 25, 26, 18, 66, 31,
+ 0, 38, 26, 30, 49, 35, 61, 0, 55, 23, 44, 35, 33, 22, 25, 28, 72, 25, 15, 29, 29, 9, 32, 18, 20, 70,
+ 20, 4, 11, 15, 10, 8, 3, 0, 5, 3, 0, 23, 11, 44, 11, 11, 8, 3, 38, 3, 28, 16, 3, 4, 20, 5, 4, 45, 15, 9, 3, 60, 27, 15, 17, 5, 6, 0, 7, 12, 0],
+
+
+ # ts = {u'2019-11-02': [u'1:862', u'3:49', u'2:1154'], u'2019-11-03': [u'1:596', u'3:67', u'2:1024']}
+ ts = ucdoc_attribute_map['ts'][0]
+ price_cat = ucdoc_attribute_map['price_cat']
+
+ for day in day_list:
+ imp = 0.0
+ if day in ts:
+ count_array = ts[day]
+ for i in count_array:
+ parts = i.split(':')
+ if parts[0] == price_cat:
+ imp = float(parts[1])
+ break
+ model_input_ts.append(imp)
+
+
+ '''
+ model_input_ts = ucdoc_attribute_map['ts']
+ price_cat = ucdoc_attribute_map['price_cat']
+
+ # --------------------------------------------------------------------------------------------------------
+
+ # remove science 06/21/2021
+ # model_input_ts = replace_with_median(model_input_ts)
+
+ model_input_ts = _denoise(model_input_ts)
+
+ ts_n = normalize_ts(model_input_ts)
+ ucdoc_attribute_map['ts_n'] = ts_n
+
+ # add page_ix
+ page_ix = ucdoc_attribute_map['uckey'] + '-' + price_cat
+ ucdoc_attribute_map['page_ix'] = page_ix
+
+ rs_ts, rs_days = dl_daily_forecast(
+ serving_url=serving_url, model_stats=model_stats, day_list=day_list, ucdoc_attribute_map=ucdoc_attribute_map)
+
+ # respose = {'2019-11-02': 220.0, '2019-11-03': 305.0}
+
+ response = {}
+ for i, day in enumerate(rs_days):
+ response[day] = rs_ts[i]
+ return response
+
+ return _helper(cols=sample)
+
+
+def run(cfg, hive_context):
+
+ model_stats = get_model_stats(hive_context, cfg['model_stat_table'])
+
+ # create day_list from yesterday for train_window
+ duration = model_stats['model']['duration']
+ predict_window = model_stats['model']['predict_window']
+ day_list = model_stats['model']['days']
+ day_list.sort()
+
+ local = False
+ if not local:
+ df_trainready = hive_context.sql(
+ 'SELECT * FROM {}'.format(cfg['trainready_table']))
+ df_dist = hive_context.sql(
+ 'SELECT * FROM {} WHERE ratio=1'.format(cfg['dist_table']))
+ df = df_trainready.join(
+ df_dist, on=['uckey', 'price_cat'], how='inner')
+ columns = df.columns
+ samples = df.take(cfg['max_calls'])
+ else:
+ sample = {'si_vec_n': [-0.4151402711868286, 2.9644479751586914, -0.3145267963409424, -0.26219648122787476, -0.3064562976360321, -0.28393232822418213, -0.28601524233818054, -0.27245578169822693, -0.23727722465991974, -0.1847621202468872, -0.1882103681564331, -0.18137064576148987, -0.17601335048675537, -0.14012782275676727, -0.17195084691047668, -0.13098371028900146, -0.10281818360090256, -0.11568441241979599, -0.08055911213159561, -0.09745623171329498, -0.032780639827251434, -0.04 [...]
+ [...]
+ sample1 = {'si_vec_n': [-0.4151402711868286, 2.9644479751586914, -0.3145267963409424, -0.26219648122787476, -0.3064562976360321, -0.28393232822418213, -0.28601524233818054, -0.27245578169822693, -0.23727722465991974, -0.1847621202468872, -0.1882103681564331, -0.18137064576148987, -0.17601335048675537, -0.14012782275676727, -0.17195084691047668, -0.13098371028900146, -0.10281818360090256, -0.11568441241979599, -0.08055911213159561, -0.09745623171329498, -0.032780639827251434, -0.0 [...]
+ [...]
+
+ sample2 = {'si_vec_n': [-0.4151402711868286, 2.9644479751586914, -0.3145267963409424, -0.26219648122787476, -0.3064562976360321, -0.28393232822418213, -0.28601524233818054, -0.27245578169822693, -0.23727722465991974, -0.1847621202468872, -0.1882103681564331, -0.18137064576148987, -0.17601335048675537, -0.14012782275676727, -0.17195084691047668, -0.13098371028900146, -0.10281818360090256, -0.11568441241979599, -0.08055911213159561, -0.09745623171329498, -0.032780639827251434, -0.0 [...]
+ [...]
+
+ samples = [sample, sample1, sample2]
+ columns = sample.keys()
+
+ day_list = day_list[:-predict_window]
+
+ errs = []
+ for _ in samples:
+ sample = {}
+ for feature in columns:
+ sample[feature] = _[feature]
+
+ whole_ts = sample['ts'][:]
+ expected = whole_ts[-predict_window:]
+ sample['ts'] = whole_ts[:-predict_window]
+ input_ts = sample['ts']
+
+ # zipped = zip(day_list, input_ts)
+ # print(zipped)
+ # print(len(zipped))
+
+ print("-------------------------------------")
+ # print(sample)
+ print("-------------------------------------")
+
+ response = predict_daily_uckey(
+ sample=sample, days=day_list, serving_url=cfg['serving_url'], model_stats=model_stats, columns=columns)
+ predicted = [response[_] for _ in sorted(response)]
+
+ for i, v in enumerate(expected):
+ if v == 0:
+ predicted[i] = 0
+ print(zip(expected, predicted))
+ e = error_m(expected, predicted)[0]
+ print(e)
+ if e < 0:
+ e = 0
+ errs.append(e)
+
+ print(sum(errs)/(len(errs)*1.0))
+
+
+if __name__ == '__main__':
+
+ cfg = {
+ 'log_level': 'warn',
+ 'trainready_table': 'dlpm_110221_no_residency_no_mapping_trainready',
+ 'dist_table': 'dlpm_110221_no_residency_no_mapping_tmp_distribution',
+ 'serving_url': 'http://10.193.217.105:8506/v1/models/dl_no_mapped_ipl:predict',
+ 'sample_ratio': 1,
+ 'max_calls': 1000,
+ 'model_stat_table': 'dlpm_110221_no_residency_no_mapping_model_stat',
+ 'yesterday': 'WILL BE SET IN PROGRAM'}
+
+ sc = SparkContext.getOrCreate()
+ hive_context = HiveContext(sc)
+ sc.setLogLevel(cfg['log_level'])
+
+ run(cfg=cfg, hive_context=hive_context)
diff --git a/Model/predictor-dl-model/troubleshooting/client_rest_dl2.py b/Model/predictor-dl-model/troubleshooting/client_rest_dl2.py
new file mode 100644
index 0000000..a2ce5f6
--- /dev/null
+++ b/Model/predictor-dl-model/troubleshooting/client_rest_dl2.py
@@ -0,0 +1,318 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+A client that talks to tensorflow_serving loaded with kaggle model.
+
+The client read kaggle feature data set, queries the service with
+such feature data to get predictions, and calculates the inference error rate.
+
+"""
+
+import pickle
+import json
+import requests
+import numpy as np
+from typing import List
+import pandas as pd
+import datetime
+import math
+import sys
+
+# from predictor_dl_model.pipeline.util import get_dow
+
+
+def get_start_end(records_len, train_window, forward_offset):
+ start = records_len - train_window - forward_offset
+ end = records_len - forward_offset
+ return start, end
+
+
+def get_dow(day_list):
+ dow_list = []
+ for day in day_list:
+ dow = datetime.datetime.strptime(day, '%Y-%m-%d').weekday()
+ dow_list.append(dow)
+
+ week_period = 7.0 / (2 * math.pi)
+ sin_list = [math.sin(x / week_period) for x in dow_list]
+ cos_list = [math.cos(x / week_period) for x in dow_list]
+ return (sin_list, cos_list)
+
+
+def lag_indexes(day_list):
+ """
+ Calculates indexes for 3, 6, 9, 12 months backward lag for the given date range
+ :param begin: start of date range
+ :param end: end of date range
+ :return: List of 4 Series, one for each lag. For each Series, index is date in range(begin, end), value is an index
+ of target (lagged) date in a same Series. If target date is out of (begin,end) range, index is -1
+ """
+ date_range = pd.date_range(day_list[0], day_list[-1])
+ # key is date, value is day index
+ base_index = pd.Series(np.arange(0, len(date_range)), index=date_range)
+
+ def lag(offset):
+ dates = date_range - offset
+ return pd.Series(data=base_index[dates].fillna(-1).astype(np.int64).values, index=date_range)
+
+ return [lag(pd.DateOffset(days=m)) for m in (30, 60)]
+
+
+def make_pred_input(duration, train_window, predict_window, full_record_exp, x_hits, dow, lagged_ix, pf_age, pf_si,
+ pf_network,
+ pf_gender, page_ix, pf_price_cat,
+ page_popularity, quarter_autocorr, forward_offset):
+ """
+ Main method. Assembles input data into final tensors
+ """
+ # x_dow, y_dow = tf.split(dow, [train_window, predict_window], axis=0)
+ x_dow = dow[:train_window]
+ y_dow = dow[train_window:]
+ # Normalize hits
+ mean = np.mean(x_hits)
+ std = np.std(x_hits)
+ if std == 0:
+ std = 1
+ norm_x_hits = [(_ - mean) / std for _ in x_hits]
+
+ # lagged_ix = np.where(lagged_ix==-1, np.NaN, lagged_ix)
+ cropped_lags = lagged_ix
+ # Mask for -1 (no data) lag indexes
+ lag_mask = cropped_lags < 0
+ # Convert -1 to 0 for gather(), it don't accept anything exotic
+ cropped_lags = np.maximum(cropped_lags, 0)
+ cropped_lags = np.where(cropped_lags > len(full_record_exp) - 1, len(full_record_exp) - 1, cropped_lags)
+ # Translate lag indexes to hit values
+ lagged_hit = np.take(full_record_exp, cropped_lags)
+ # Convert masked (see above) or NaN lagged hits to zeros
+ lag_zeros = np.zeros_like(lagged_hit)
+ lagged_hit = np.where(lag_mask | np.isnan(
+ lagged_hit), lag_zeros, lagged_hit)
+ start, end = get_start_end(duration, train_window, forward_offset)
+ lagged_hit = lagged_hit[start:end + predict_window]
+ norm_lagged_hits = np.divide(np.subtract(lagged_hit, mean), std)
+
+ # stat = {'mean':mean, 'std':std}
+ # Split lagged hits to train and predict
+ x_lagged, y_lagged = norm_lagged_hits[:
+ train_window], norm_lagged_hits[train_window:]
+
+ # Combine all page features into single tensor
+ stacked_features = np.stack([page_popularity, quarter_autocorr])
+
+ flat_ucdoc_features = np.concatenate([pf_age, pf_si, pf_network, pf_gender, pf_price_cat, stacked_features],
+ axis=0) # pf_region
+ ucdoc_features = np.expand_dims(flat_ucdoc_features, 0)
+
+ # Train features
+ x_features = np.concatenate([
+ # [n_days] -> [n_days, 1]
+ np.expand_dims(norm_x_hits, -1),
+ x_dow,
+ x_lagged,
+ # Stretch ucdoc_features to all training days
+ # [1, features] -> [n_days, features]
+ np.tile(ucdoc_features, [train_window, 1])], axis=1
+ )
+ y_features = np.concatenate([
+ # [n_days] -> [n_days, 1]
+ y_dow,
+ y_lagged,
+ # Stretch ucdoc_features to all testing days
+ # [1, features] -> [n_days, features]
+ np.tile(ucdoc_features, [predict_window, 1])
+ ], axis=1)
+
+ return x_hits, x_features, norm_x_hits, x_lagged, y_features, mean, std, flat_ucdoc_features, page_ix # , stat
+
+
+def get_predict_post_body(model_stats, day_list, day_list_cut, page_ix, pf_age, pf_si, pf_network, pf_gender,
+ full_record, hits, pf_price_cat, predict_day_list, forward_offset):
+
+
+ train_window = model_stats['model']['train_window'] # comes from cfg
+ predict_window = model_stats['model']['predict_window'] # comes from cfg
+ x_hits = np.log(np.add(hits, 1)).tolist() # ln + 1
+ full_record_exp = np.log(np.add(full_record, 1)).tolist()
+
+ # TODO: Added
+ duration = len(full_record_exp)
+
+ if len(day_list_cut) != train_window + predict_window:
+ raise Exception('day_list_cut and train window + predicti_window do not match. {} {} {}'.format(
+ len(day_list_cut), train_window, predict_window))
+
+ dow = get_dow(day_list_cut)
+ dow = [[dow[0][i], dow[1][i]] for i in range(train_window + predict_window)]
+ for x in predict_day_list:
+ if x not in day_list:
+ day_list.extend(predict_day_list)
+ lagged_indx = np.stack(lag_indexes(day_list), axis=-1)
+
+ # not used in the model (but we should keep it)
+ page_popularity = np.mean(full_record)
+ page_popularity = (
+ page_popularity - model_stats['stats']['page_popularity'][0]) / \
+ model_stats['stats']['page_popularity'][1]
+ quarter_autocorr = 1
+
+ # x_hits, x_features, norm_x_hits, x_lagged, y_features, mean, std, flat_ucdoc_features, page_ix
+ truex, timex, normx, laggedx, timey, normmean, normstd, pgfeatures, pageix = make_pred_input(duration,
+ train_window,
+ predict_window,
+ full_record_exp,
+ x_hits, dow,
+ lagged_indx, pf_age,
+ pf_si, pf_network,
+ pf_gender, page_ix,
+ pf_price_cat,
+ page_popularity,
+ quarter_autocorr, forward_offset)
+
+ # ys are not important]
+ truey = [1 for _ in range(predict_window)]
+ normy = [1 for _ in range(predict_window)]
+
+ instance = {"truex": truex, "timex": timex.tolist(), "normx": normx,
+ "laggedx": laggedx.tolist(),
+ "truey": truey, "timey": timey.tolist(), "normy": normy,
+ "normmean": normmean,
+ "normstd": normstd, "page_features": pgfeatures.tolist(),
+ "pageix": pageix}
+ # print(instance)
+ return instance # , stat
+
+
+def predict(serving_url, model_stats, day_list, ucdoc_attribute_map, forward_offset):
+ page_ix = ucdoc_attribute_map['page_ix']
+ pf_age = [ucdoc_attribute_map['a__n'], ucdoc_attribute_map['a_1_n'], ucdoc_attribute_map['a_2_n'], ucdoc_attribute_map['a_3_n'],
+ ucdoc_attribute_map['a_4_n'], ucdoc_attribute_map['a_5_n'], ucdoc_attribute_map['a_6_n']]
+ pf_si = ucdoc_attribute_map['si_vec_n']
+ pf_network = [ucdoc_attribute_map['t_2G_n'], ucdoc_attribute_map['t_3G_n'], ucdoc_attribute_map['t_4G_n'],
+ ucdoc_attribute_map['t_UNKNOWN_n'], ucdoc_attribute_map['t_WIFI_n'], ]
+ pf_gender = [ucdoc_attribute_map['g__n'], ucdoc_attribute_map['g_g_f_n'], ucdoc_attribute_map['g_g_m_n'], ucdoc_attribute_map['g_g_x_n']]
+ pf_price_cat = [ucdoc_attribute_map['price_cat_1_n'], ucdoc_attribute_map['price_cat_2_n'], ucdoc_attribute_map['price_cat_3_n']]
+ full_record = np.expm1(ucdoc_attribute_map['ts_n'])
+
+ # days_list is sorted from past to present [2018-01-01, 2018-01-02, ...]
+ prediction_results = []
+ # for full_record, price_cat in records_hour_price_list:
+ train_window = model_stats['model']['train_window']
+ prediction_window = model_stats['model']['predict_window']
+ start = len(full_record) - train_window - forward_offset
+ end = len(full_record) - forward_offset
+ hits = full_record[start:end]
+
+ day_list_cut = day_list[start:end]
+ predict_day = [datetime.datetime.strptime(day_list_cut[-1], "%Y-%m-%d").date() + datetime.timedelta(days=x + 1) for
+ x in range(prediction_window)]
+ predict_day_list = [x.strftime("%Y-%m-%d") for x in predict_day]
+ day_list_cut.extend(predict_day_list)
+
+ body = {"instances": []}
+ instance = get_predict_post_body(
+ model_stats, day_list, day_list_cut, page_ix, pf_age, pf_si, pf_network, pf_gender, full_record, hits, pf_price_cat, predict_day_list, forward_offset)
+ body['instances'].append(instance)
+
+ body_json = json.dumps(body)
+ result = requests.post(serving_url, data=body_json).json()
+ predictions = result['predictions'][0]
+ predictions = np.round(np.expm1(predictions))
+ prediction_results.append(predictions.tolist())
+
+ return prediction_results, predict_day_list
+
+
+if __name__ == '__main__': # record is equal to window size
+ URL = "http://10.193.217.105:8508/v1/models/dl_20210706:predict"
+
+ model_stats = {
+ "model": {"days": ["2020-03-01", "2020-03-02", "2020-03-03", "2020-03-04", "2020-03-05", "2020-03-06", "2020-03-07",
+ "2020-03-08", "2020-03-09", "2020-03-10", "2020-03-11", "2020-03-12", "2020-03-13", "2020-03-14",
+ "2020-03-15", "2020-03-16", "2020-03-17", "2020-03-18", "2020-03-19", "2020-03-20", "2020-03-21",
+ "2020-03-22", "2020-03-23", "2020-03-24", "2020-03-25", "2020-03-26", "2020-03-27", "2020-03-28",
+ "2020-03-29", "2020-03-30", "2020-03-31", "2020-04-01", "2020-04-02", "2020-04-03", "2020-04-04",
+ "2020-04-05", "2020-04-06", "2020-04-07", "2020-04-08", "2020-04-09", "2020-04-10", "2020-04-11",
+ "2020-04-12", "2020-04-13", "2020-04-14", "2020-04-15", "2020-04-16", "2020-04-17", "2020-04-18",
+ "2020-04-19", "2020-04-20", "2020-04-21", "2020-04-22", "2020-04-23", "2020-04-24", "2020-04-25",
+ "2020-04-26", "2020-04-27", "2020-04-28", "2020-04-29", "2020-04-30", "2020-05-01", "2020-05-02",
+ "2020-05-03", "2020-05-04", "2020-05-05", "2020-05-06", "2020-05-07", "2020-05-08", "2020-05-09",
+ "2020-05-10", "2020-05-11", "2020-05-12", "2020-05-13", "2020-05-14", "2020-05-15", "2020-05-16",
+ "2020-05-17", "2020-05-18", "2020-05-19", "2020-05-20", "2020-05-21", "2020-05-22", "2020-05-23",
+ "2020-05-24", "2020-05-25", "2020-05-26", "2020-05-27", "2020-05-28", "2020-05-29", "2020-05-30",
+ "2020-05-31", "2020-06-01", "2020-06-02", "2020-06-03", "2020-06-04", "2020-06-05", "2020-06-06",
+ "2020-06-07", "2020-06-08", "2020-06-09", "2020-06-10", "2020-06-11", "2020-06-12", "2020-06-13",
+ "2020-06-14", "2020-06-15", "2020-06-16", "2020-06-17", "2020-06-18", "2020-06-19", "2020-06-20",
+ "2020-06-21", "2020-06-22", "2020-06-23", "2020-06-24", "2020-06-25", "2020-06-26", "2020-06-27",
+ "2020-06-28", "2020-06-29", "2020-06-30"], "duration": 112,
+ "holidays_norm": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
+ "name": "model_dlpm_06242021_1635", "predict_window": 10, "train_window": 60, "version": "version_06242021_1635"},
+ "stats": {"a_": [0.04429662466026579, 0.20533941439740103], "a_1": [0.0594024062498474, 0.23536391091868636],
+ "a_2": [0.13875638124033587, 0.3443551863453624], "a_3": [0.18247821941727763, 0.3847745538180887],
+ "a_4": [0.23540840480301564, 0.4227143144991038], "a_5": [0.20678420677160178, 0.40351830940626193],
+ "a_6": [0.1328737568768233, 0.338151429839641], "g_": [0.03802539182225981, 0.1910382838856348],
+ "g_g_f": [0.31290878405353795, 0.46186909160791834], "g_g_m": [0.6370381966764169, 0.4790592824120438],
+ "g_g_x": [0.012027627461027505, 0.10846109268711399],
+ "holiday_stats": [0.0, 0.0], "ipl_": [1.0, 1.0], "ipl_1": [0.0, 1.0], "ipl_10": [0.0, 1.0], "ipl_11": [0.0, 1.0], "ipl_12": [0.0, 1.0], "ipl_13": [0.0, 1.0], "ipl_14": [0.0, 1.0], "ipl_15": [0.0, 1.0], "ipl_16": [0.0, 1.0], "ipl_17": [0.0, 1.0], "ipl_18": [0.0, 1.0], "ipl_19": [0.0, 1.0], "ipl_2": [0.0, 1.0], "ipl_20": [0.0, 1.0], "ipl_21": [0.0, 1.0], "ipl_22": [0.0, 1.0], "ipl_23": [0.0, 1.0], "ipl_24": [0.0, 1.0], "ipl_25": [0.0, 1.0], "ipl_26": [0.0, 1.0], "ipl_27": [...]
+ "page_popularity": [577.1606531140158, 5127.251109603019],
+ "price_cat_1": [0.6813400284793525, 0.4659575570248394], "price_cat_2": [0.1969572060256314, 0.3977003132674967],
+ "price_cat_3": [0.12170276549501612, 0.3269426904032709], "r_": [0.06428884716597416, 0.2445368641073571], "r_1": [0.017636910330801243, 0.13112898948090662], "r_10": [0.007311564935032408, 0.08479574334903354], "r_11": [0.0123057065964087, 0.10979549743303678], "r_12": [0.010682300938137547, 0.10238183510308607], "r_13": [0.012839921663653402, 0.11211387396512836], "r_14": [0.0043072282145927634, 0.0652208481532438], "r_15": [0.005510994725533859, 0.07372435219261837], [...]
+ "si_15e9ddce941b11e5bdec00163e291137": [0.0021734242674061304, 0.046569378305571674],
+ "si_17dd6d8098bf11e5bdec00163e291137": [8.064153488720677E-4, 0.028386043378890852],
+ "si_5cd1c663263511e6af7500163e291137": [0.024480251817432363, 0.15453491734752325], "si_66bcd2720e5011e79bc8fa163e05184e": [0.0747118339204077, 0.2629261931283321], "si_68bcd2720e5011e79bc8fa163e05184e": [0.00832196657423368, 0.09084457158709529], "si_71bcd2720e5011e79bc8fa163e05184e": [0.0031866896500037474, 0.056360839080046306], "si_7b0d7b55ab0c11e68b7900163e3e481d": [0.07978115865997153, 0.270954508123256], "si_a290af82884e11e5bdec00163e291137": [0.08873866446826051 [...]
+ }
+ days = model_stats['model']['days']
+
+ x = [
+ 4.919981, 4.912655, 4.912655, 5.1119876, 4.5217886, 5.638355, 5.673323, 5.874931, 5.6801724, 5.4026775,
+ 6.2422233, 6.150603, 5.918894, 6.251904, 6.0497336, 6.075346, 5.9939613, 6.222576, 6.1944056, 6.1025586,
+ 6.3261495, 5.811141, 5.3752785, 5.6903596, 5.4722705, 5.497168, 5.1357985, 4.919981, 5.117994, 4.9904327,
+ 5.0998664, 5.159055, 5.2983174, 4.7004805, 0.0, 4.1431346, 4.4998097, 4.6151204, 4.6728287, 4.356709,
+ 5.0238805, 5.749393, 5.4467373, 5.433722, 5.4764633, 4.9767337, 5.2832036, 5.996452, 6.011267, 6.089045,
+ 5.8318825, 5.556828, 5.590987, 5.755742, 5.8406415, 5.7525725, 5.733341, 4.4188404, 0.0, 4.454347, 3.988984,
+ 4.26268, 4.2904596, 4.804021, 4.4308167, 0.0, 6.0684257, 6.423247, 6.6795993, 6.6463904, 6.418365, 6.25575,
+ 6.1984787, 6.2841344, 6.350886, 6.4361506, 6.3733196, 6.668228, 6.2186003, 5.8230457, 6.2461066, 5.9839363,
+ 6.424869, 6.7730803, 6.510258, 6.591674, 6.767343, 6.666957, 5.529429, 5.075174, 4.248495, 0.0, 4.0943446,
+ 6.685861, 6.8330317, 6.7214255, 6.5971456, 6.629363, 6.72263, 6.5117455, 6.6795993, 6.8211074, 7.004882,
+ 6.9920964, 6.9641356, 6.991177, 6.6554403, 6.364751, 6.50129, 6.6346335, 6.43294, 6.9353704, 7.0030656,
+ 6.7968235, 7.2174435, 7.183871, 7.3225102, 7.010312, 7.4506607, 6.8855095, 6.163315, 0.0]
+
+ days = days[:-10]
+ x = x[:-10]
+ si = [-0.04667067527770996, -0.2841551601886749, -0.29444485902786255, -0.06350809335708618, -0.1994006484746933,
+ -0.11050120741128922, -0.028408868238329887, -0.15841242671012878, -0.09160664677619934,
+ -0.056540846824645996, -0.3120572865009308, -0.5284554362297058, -0.32392826676368713, -0.06355565786361694,
+ -0.13475105166435242, 6.289364814758301, -0.21588164567947388, -0.23473970592021942, -0.06135460361838341,
+ -0.06582210212945938, -0.15293772518634796, -0.1407172530889511, -0.09954438358545303, -0.05613924190402031,
+ -0.15412424504756927, -0.3370814323425293, -0.1369824856519699]
+
+ forward_offset = 0
+
+ response = predict(serving_url=URL, model_stats=model_stats, day_list=days, ucdoc_attribute_map={'uckey': 'magazinelock,1,3G,g_f,2,pt,1004,icc',
+ 'ts': [], 'price_cat': '1',
+ 'p': 462.7049255371094, 'a__n': 4.654261589050293, 'a_1_n': -0.25238537788391113, 'a_2_n': -0.40294551849365234,
+ 'a_3_n': -0.4742470979690552, 'a_4_n': -0.5568971633911133, 'a_5_n': -0.5124530792236328, 'a_6_n': -0.39294159412384033,
+ 't_UNKNOWN_n': -0.0056058494374156, 't_3G_n': -0.09904143214225769, 't_4G_n': -0.853390634059906, 't_WIFI_n': 0.8717950582504272,
+ 't_2G_n': -0.02846473827958107, 'g__n': 5.035506725311279,
+ 'g_g_f_n': -0.6774836778640747, 'g_g_m_n': -1.3297690153121948, 'g_g_x_n': -0.11089347302913666,
+ 'price_cat_1_n': 0.6838819980621338, 'price_cat_2_n': -0.49524027, 'price_cat_3_n': -0.37224495,
+ 'si_vec_n': si, 'r_vec_n': [], 'p_n': -0.02232302, 'ts_n': x, 'page_ix': 'native,d9jucwkpr3,WIFI,,,CPM,,60-1'}, forward_offset=forward_offset)
+
+ print(response)
diff --git a/Processes/dlpredictor/VERSION.md b/Processes/dlpredictor/VERSION.md
index b8bc3e0..c746b0a 100644
--- a/Processes/dlpredictor/VERSION.md
+++ b/Processes/dlpredictor/VERSION.md
@@ -1,4 +1,7 @@
### 1.6
1. Add residency and IPL features
2. Add tag to config file. The whole set tmp and final artifacts are named by product_tag and pipeline_tag. The user does not need to review the name of those tables anymore.
-3. Read model statistics from hive instead of elasticsearch.
\ No newline at end of file
+3. Read model statistics from hive instead of elasticsearch.
+
+### 1.7
+1. Remove re-distributing of traffic for mapped IPL.
diff --git a/Processes/dlpredictor/conf/config.yml b/Processes/dlpredictor/conf/config.yml
index dcfff92..081367c 100644
--- a/Processes/dlpredictor/conf/config.yml
+++ b/Processes/dlpredictor/conf/config.yml
@@ -1,23 +1,18 @@
log_level: 'info'
product_tag: 'dlpredictor'
-pipeline_tag: 'dlpm_10052021_1400_reza'
+pipeline_tag: '110221_no_residency_no_mapping'
#input tables from dlpm pipeline
-factdata_table: 'factdata_hq_09222020'
-area_map_table: 'dlpm_10052021_1400_tmp_area_map' # this raw data, with filtered si, remapped r and ipl and partitioned by bucket-id
-distribution_table: 'dlpm_10052021_1400_tmp_distribution'
-norm_table: 'dlpm_10052021_1400_trainready'
-model_stat_table: 'dlpm_10052021_1400_model_stat'
-region_mapping_table: 'region_mapping'
-bucket_size: 1
-bucket_step: 1
-ipl_dist_table: '{product_tag}_{pipeline_tag}_ipl_dist_map'
-unique_original_uckey_table: '{product_tag}_{pipeline_tag}_unique_original_uckey'
-skip_ipl_reverse_mapping: true # this makes main_spark_es to process mapped uckeys and skip reverse mapping.
+area_map_table: 'dlpm_110221_no_residency_no_mapping_tmp_area_map' # this raw data, with filtered si, remapped r and ipl and partitioned by bucket-id
+distribution_table: 'dlpm_110221_no_residency_no_mapping_tmp_distribution'
+norm_table: 'dlpm_110221_no_residency_no_mapping_trainready'
+model_stat_table: 'dlpm_110221_no_residency_no_mapping_model_stat'
+bucket_size: 10
+bucket_step: 2
condition: ''
-yesterday: '2021-07-31'
-serving_url: 'http://10.193.217.105:8508/v1/models/dl_20210706:predict'
+yesterday: '2021-07-21'
+serving_url: 'http://10.193.217.105:8506/v1/models/dl_no_mapped_ipl:predict'
config_table: '{product_tag}_{pipeline_tag}_config'
diff --git a/Processes/dlpredictor/dlpredictor/main_build_ipl_dist.py b/Processes/dlpredictor/dlpredictor/main_build_ipl_dist.py
deleted file mode 100644
index c086c36..0000000
--- a/Processes/dlpredictor/dlpredictor/main_build_ipl_dist.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0.html
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import math
-import pickle
-import yaml
-import argparse
-
-from pyspark import SparkContext, SparkConf, Row
-from pyspark.sql.functions import concat_ws, count, lit, col, udf, expr, collect_list, explode, sum, array, split
-from pyspark.sql.types import BooleanType, IntegerType, StringType, FloatType
-from pyspark.sql import HiveContext
-from pyspark.sql.window import Window
-from dlpredictor.configutil import *
-import hashlib
-
-'''
-spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G dlpredictor/main_build_ipl_dist.py conf/config.yml
-'''
-
-
-def __save_as_table(df, table_name, hive_context, create_table):
- if create_table:
- command = """
- DROP TABLE IF EXISTS {}
- """.format(table_name)
-
- hive_context.sql(command)
-
- df.createOrReplaceTempView("r900_temp_table")
-
- command = """
- CREATE TABLE IF NOT EXISTS {} AS SELECT * FROM r900_temp_table
- """.format(table_name)
-
- hive_context.sql(command)
-
-
-def run(hive_context, conditions, factdata_table, ipl_dist_table, unique_original_uckey_table, region_mapping_table, bucket_size, bucket_step):
-
- # ts will be counts from yesterday-(past_days) to yesterday
- mapping_df = hive_context.sql('SELECT old,new FROM {}'.format(region_mapping_table))
-
- start_bucket = 0
- df_union = None
- df_distinct_uckey = None
-
- while True:
-
- end_bucket = min(bucket_size, start_bucket + bucket_step)
-
- if start_bucket > end_bucket:
- break
-
- # Read factdata table
- command = """
- SELECT count_array,uckey,bucket_id FROM {} WHERE bucket_id BETWEEN {} AND {}
- """.format(factdata_table, str(start_bucket), str(end_bucket))
-
- if len(conditions) > 0:
- command = command + " and {}".format(' and '.join(conditions))
-
- start_bucket = end_bucket + 1
-
- df = hive_context.sql(command)
- # [Row(count_array=[u'0:0', u'1:0', u'2:0', u'3:0'], day=u'2018-03-09', hour=0, uckey=u'banner,1,3G,g_f,1,pt,1002,icc')]
-
- # extract ipl
- df = df.withColumn('ipl', split(df['uckey'], ',').getItem(7).cast(StringType()))
-
- def _udf_helper(count_arrays):
- result = 0
- for count_array in count_arrays:
- for item in count_array:
- imp = int(item.split(':')[1])
- result += imp
- return result
-
- df_uckey = df.select('uckey')
- if df_distinct_uckey is None:
- df_distinct_uckey = df_uckey.select('uckey').distinct()
- else:
- df_distinct_uckey = df_distinct_uckey.union(df_uckey)
- df_distinct_uckey = df_distinct_uckey.select('uckey').distinct()
-
- df = df.groupby('ipl').agg(udf(_udf_helper, IntegerType())(collect_list('count_array')).alias('imp'))
- if df_union is None:
- df_union = df
- else:
- df_union = df_union.union(df)
-
- df = df_union.groupby('ipl').agg(sum('imp').alias('region_imp'))
- df = mapping_df.join(df, mapping_df.old == df.ipl, 'outer')
- df = df.withColumn('region_total_imp', sum('region_imp').over(Window.partitionBy('new')))
- df = df.withColumn('ratio', udf(lambda x, y: float(x)/y if x and y else 0, FloatType())('region_imp', 'region_total_imp'))
-
- __save_as_table(df=df, table_name=ipl_dist_table, hive_context=hive_context, create_table=True)
-
- __save_as_table(df=df_distinct_uckey, table_name=unique_original_uckey_table, hive_context=hive_context, create_table=True)
-
-
-if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='Prepare data')
- parser.add_argument('config_file')
- args = parser.parse_args()
-
- # Load config file
- with open(args.config_file, 'r') as ymlfile:
- cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
- resolve_placeholder(cfg)
-
- sc = SparkContext()
- hive_context = HiveContext(sc)
- sc.setLogLevel(cfg['log_level'])
-
- hive_context.setConf("hive.exec.dynamic.partition", "true")
- hive_context.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
-
- factdata_table = cfg['factdata_table']
- region_mapping_table = cfg['region_mapping_table']
- bucket_size = cfg['bucket_size']
- bucket_step = cfg['bucket_step']
- conditions = cfg['condition']
- ipl_dist_table = cfg['ipl_dist_table']
- unique_original_uckey_table = cfg['unique_original_uckey_table']
-
- run(hive_context=hive_context, conditions=conditions, factdata_table=factdata_table,
- ipl_dist_table=ipl_dist_table, unique_original_uckey_table=unique_original_uckey_table, region_mapping_table=region_mapping_table,
- bucket_size=bucket_size, bucket_step=bucket_step)
-
- sc.stop()
diff --git a/Processes/dlpredictor/dlpredictor/main_spark_es.py b/Processes/dlpredictor/dlpredictor/main_spark_es.py
index f2fa82a..0238f74 100644
--- a/Processes/dlpredictor/dlpredictor/main_spark_es.py
+++ b/Processes/dlpredictor/dlpredictor/main_spark_es.py
@@ -108,51 +108,6 @@ def __save_as_table(df, table_name, hive_context, create_table):
hive_context.sql(command)
-def ipl_revrse_mapping(df, ipl_dist_map_brodcast, df_uckey_distinct):
-
- df = df.withColumn('ipl', split(df['uckey'], ',').getItem(7).cast(StringType()))
- df = df.filter(udf(lambda ipl: ipl in ipl_dist_map_brodcast.value, BooleanType())(df.ipl))
- df = df.withColumn('real_ipl_ratio_map', udf(lambda ipl: ipl_dist_map_brodcast.value[ipl], MapType(StringType(), FloatType(), False))(df.ipl))
-
- # +-------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------------------------------------------------+---+------------------+
- # |cluster_uckey|price_cat|day_prediction_map |ratio |uckey |ipl|real_ipl_ratio_map|
- # +-------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------------------------------------------------+---+------------------+
- # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0] |0.00800733 |native,z041bf6g4s,WIFI,g_f,5,CPM,40,40 |40 |[40 -> 1.0] |
- # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0] |0.010742836 |native,z041bf6g4s,WIFI,g_m,3,CPM,30,30 |30 |[30 -> 1.0] |
-
- df = df.select('cluster_uckey', 'price_cat', 'day_prediction_map', 'ratio', 'uckey', 'ipl', explode('real_ipl_ratio_map')).withColumnRenamed(
- "key", "real_ipl").withColumnRenamed("value", "ipl_ratio")
-
- # +-------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------------------------------+---+--------+-----------+
- # |cluster_uckey|price_cat|day_prediction_map |ratio |uckey |ipl|real_ipl|ipl_ratio |
- # +-------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------------------------------+---+--------+-----------+
- # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0]|0.033795446|native,z041bf6g4s,WIFI,g_f,4,CPM,57,57|57 |57 |1.0 |
- # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0]|0.00800733 |native,z041bf6g4s,WIFI,g_f,5,CPM,40,40|40 |40 |1.0 |
-
- # change uckey with new ipl, this for ipl fix not region
- def __fix_uckey_ipl(uckey, ipl):
- l = uckey.split(',')
- l[7] = str(ipl)
- return ','.join(l)
- df = df.withColumn('uckey', udf(__fix_uckey_ipl, StringType())(df.uckey, df.real_ipl))
-
- # filter uckeys to make sure we predict for valid uckeys
- df = df.join(df_uckey_distinct, on='uckey', how='inner')
-
- # +-------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------------------------------+---+--------+-----------+
- # |cluster_uckey|price_cat|day_prediction_map |ratio |uckey |ipl|real_ipl|ipl_ratio |
- # +-------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------------------------------+---+--------+-----------+
- # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0]|0.010742836|native,z041bf6g4s,WIFI,g_m,3,CPM,30,30|30 |30 |1.0 |
- # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0]|0.00800733 |native,z041bf6g4s,WIFI,g_f,5,CPM,40,40|40 |40 |1.0 |
- # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0]|0.042944785|native,z041bf6g4s,4G,g_m,4,CPM,3,3 |3 |3 |1.0 |
- # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0]|0.011951239|native,z041bf6g4s,4G,g_f,5,CPM,71,101 |71 |101 |0.08843476 |
-
- # update ratio
- df = df.withColumn('ratio', udf(lambda r1, r2: float(r1*r2) if r1 and r2 else float(0), FloatType())(df.ratio, df.ipl_ratio))
-
- return df
-
-
def run(cfg):
# os.environ[
@@ -181,9 +136,6 @@ def run(cfg):
norm_table = cfg['norm_table']
traffic_dist = cfg['traffic_dist']
model_stat_table = cfg['model_stat_table']
- ipl_dist_table = cfg['ipl_dist_table']
- unique_original_uckey_table = cfg['unique_original_uckey_table']
- skip_ipl_reverse_mapping = bool(cfg['skip_ipl_reverse_mapping'])
prediction_table_name = cfg['es_predictions_index']
yesterday = cfg['yesterday']
@@ -205,35 +157,6 @@ def run(cfg):
df_dist.cache()
df_dist.count()
- if not skip_ipl_reverse_mapping:
- command = """
- SELECT
- DIST.old as mapped_ipl,
- DIST.ipl as real_ipl,
- DIST.ratio
- FROM {} AS DIST
- """.format(ipl_dist_table)
- df = hive_context.sql(command)
- ipl_dist_list = df.collect()
- ipl_dist_map = {}
- for _ in ipl_dist_list:
- mapped_ipl = _['mapped_ipl']
- if not mapped_ipl:
- continue
- mapped_ipl = str(mapped_ipl)
- real_ipl = _['real_ipl']
- ratio = float(0)
- if _['ratio']:
- ratio = float(_['ratio'])
- if mapped_ipl not in ipl_dist_map:
- ipl_dist_map[mapped_ipl] = {}
- ipl_dist_map[mapped_ipl][real_ipl] = ratio
-
- ipl_dist_map_brodcast = sc.broadcast(ipl_dist_map)
-
- # Get original uckeys
- df_uckey_distinct = hive_context.sql('SELECT uckey FROM {}'.format(unique_original_uckey_table))
-
# Read norm table
# DataFrame[uckey: string, ts: array<int>, p: float, a__n: float, a_1_n: float, a_2_n: float, a_3_n: float, a_4_n: float, a_5_n: float, a_6_n: float, t_UNKNOWN_n: float, t_3G_n: float, t_4G_n: float, t_WIFI_n: float, t_2G_n: float, g__n: float, g_g_f_n: float, g_g_m_n: float, g_g_x_n: float, price_cat_1_n: float, price_cat_2_n: float, price_cat_3_n: float, si_vec_n: array<float>, r_vec_n: array<float>, p_n: float, ts_n: array<float>]
command = """
@@ -244,9 +167,7 @@ def run(cfg):
t_UNKNOWN_n,t_3G_n,t_4G_n,t_WIFI_n,t_2G_n,
g__n, g_g_f_n, g_g_m_n, g_g_x_n,
price_cat_1_n, price_cat_2_n, price_cat_3_n,
- si_vec_n,
- r_vec_n,
- ipl_vec_n
+ si_vec_n
FROM {}
""".format(norm_table)
df_norm = hive_context.sql(command)
@@ -357,9 +278,6 @@ def run(cfg):
# |1009 |2 |[2020-06-26 -> 169.0, 2020-06-27 -> 170.0, 2020-06-24 -> 158.0, 2020-06-25 -> 155.0, 2020-06-28 -> 146.0, 2020-06-29 -> 127.0, 2020-06-30 -> 127.0, 2020-06-22 -> 171.0, 2020-06-23 -> 159.0, 2020-06-21 -> 227.0]|0.00800733|native,z041bf6g4s,WIFI,g_f,5,CPM,40,40|
# +-------------+---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+--------------------------------------+
- if not skip_ipl_reverse_mapping:
- df = ipl_revrse_mapping(df, ipl_dist_map_brodcast=ipl_dist_map_brodcast, df_uckey_distinct=df_uckey_distinct)
-
# [Row(ucdoc_elements=Row(price_cat=u'2', ratio=0.11989551782608032, day_prediction_map={u'2019-11-02': 220.0, u'2019-11-03': 305.0}), uckey=u'native,66bcd2720e5011e79bc8fa163e05184e,WIFI,g_m,5,CPC,5')]
ucdoc_elements_type = StructType([StructField('price_cat', StringType(), False), StructField(
'ratio', FloatType(), False), StructField('day_prediction_map', MapType(StringType(), FloatType()), False)])
diff --git a/Processes/dlpredictor/dlpredictor/show_config.py b/Processes/dlpredictor/dlpredictor/show_config.py
index 3a99776..bfa12c3 100644
--- a/Processes/dlpredictor/dlpredictor/show_config.py
+++ b/Processes/dlpredictor/dlpredictor/show_config.py
@@ -63,7 +63,7 @@ def run(cfg, hive_context):
product_tag = cfg['product_tag']
pipeline_tag = cfg['pipeline_tag']
- factdata = cfg['factdata_table']
+ area_map_table = cfg['area_map_table']
distribution_table = cfg['distribution_table']
norm_table = cfg['norm_table']
model_stat_table = cfg['model_stat_table']
@@ -88,8 +88,8 @@ def run(cfg, hive_context):
print('')
command = "SELECT * FROM {}"
- df = hive_context.sql(command.format(factdata))
- print('Factdata schema')
+ df = hive_context.sql(command.format(area_map_table))
+ print('Filterted Factdata table schema')
df.printSchema()
command = "SELECT * FROM {}"
diff --git a/Processes/dlpredictor/experiments/elasticsearch-hadoop-6.8.0.jar b/Processes/dlpredictor/experiments/elasticsearch-hadoop-6.8.0.jar
new file mode 100644
index 0000000..76cde4a
Binary files /dev/null and b/Processes/dlpredictor/experiments/elasticsearch-hadoop-6.8.0.jar differ
diff --git a/Processes/dlpredictor/experiments/scripts/test_spark_es_big_write.py b/Processes/dlpredictor/experiments/scripts/test_spark_es_big_write.py
new file mode 100644
index 0000000..73753eb
--- /dev/null
+++ b/Processes/dlpredictor/experiments/scripts/test_spark_es_big_write.py
@@ -0,0 +1,1836 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This file is to test pushing LARGE Spark data frame into Elasticsearch.
+Each row in dataframe is a document in Elasticsearch.
+"""
+# Reza
+
+import os
+import json
+
+from pyspark import SparkContext, SparkConf
+from pyspark.sql import SQLContext
+from pyspark.sql import HiveContext
+
+from pyspark.sql.functions import udf, expr, collect_list, struct
+from pyspark.sql.types import StringType, ArrayType, MapType, FloatType, StructField, StructType
+
+'''
+spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 32G --driver-memory 32G --jars elasticsearch-hadoop-6.8.0.jar scripts/test_spark_es_big_write.py
+'''
+
+MDOC = {
+ "a": "5",
+ "g": "g_m",
+ "m": "native",
+ "predictions": {
+ "2020-06-21": {
+ "hours": [
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.52906338707,
+ "total": 0.52906338707
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.326346659629,
+ "total": 0.326346659629
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.2899849149,
+ "total": 0.2899849149
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.263622649876,
+ "total": 0.263622649876
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.301802481923,
+ "total": 0.301802481923
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.494519729477,
+ "total": 0.494519729477
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.931769710793,
+ "total": 0.931769710793
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.18902905524,
+ "total": 1.18902905524
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.19630140433,
+ "total": 1.19630140433
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.965404324726,
+ "total": 0.965404324726
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.805412647628,
+ "total": 0.805412647628
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.807230734765,
+ "total": 0.807230734765
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.884499442518,
+ "total": 0.884499442518
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.789958905968,
+ "total": 0.789958905968
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.768141859058,
+ "total": 0.768141859058
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.769050902717,
+ "total": 0.769050902717
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.839956305221,
+ "total": 0.839956305221
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.985403284318,
+ "total": 0.985403284318
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.02085598557,
+ "total": 1.02085598557
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.96994954284,
+ "total": 0.96994954284
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.940860147021,
+ "total": 0.940860147021
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.879045180745,
+ "total": 0.879045180745
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.729962027011,
+ "total": 0.729962027011
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.528154343411,
+ "total": 0.528154343411
+ }
+ ],
+ "date": "2020-06-21"
+ },
+ "2020-06-22": {
+ "hours": [
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.57477844577,
+ "total": 0.57477844577
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.354545467306,
+ "total": 0.354545467306
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.315041794152,
+ "total": 0.315041794152
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.286401631011,
+ "total": 0.286401631011
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.327880487912,
+ "total": 0.327880487912
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.537249956164,
+ "total": 0.537249956164
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.01228162688,
+ "total": 1.01228162688
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.29177011497,
+ "total": 1.29177011497
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.29967084976,
+ "total": 1.29967084976
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.04882252461,
+ "total": 1.04882252461
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.875006362415,
+ "total": 0.875006362415
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.876981545964,
+ "total": 0.876981545964
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.960926851639,
+ "total": 0.960926851639
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.858217301161,
+ "total": 0.858217301161
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.834515097189,
+ "total": 0.834515097189
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.835502689063,
+ "total": 0.835502689063
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.91253485202,
+ "total": 0.91253485202
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.07054954483,
+ "total": 1.07054954483
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.10906562631,
+ "total": 1.10906562631
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.05376048378,
+ "total": 1.05376048378
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.02215754522,
+ "total": 1.02215754522
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.955001300597,
+ "total": 0.955001300597
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.793036240288,
+ "total": 0.793036240288
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.573790853896,
+ "total": 0.573790853896
+ }
+ ],
+ "date": "2020-06-22"
+ },
+ "2020-06-23": {
+ "hours": [
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.559059800944,
+ "total": 0.559059800944
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.344849602202,
+ "total": 0.344849602202
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.306426248277,
+ "total": 0.306426248277
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.278569316581,
+ "total": 0.278569316581
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.318913838289,
+ "total": 0.318913838289
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.52255761461,
+ "total": 0.52255761461
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.984598446562,
+ "total": 0.984598446562
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.2564436761,
+ "total": 1.2564436761
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.26412834703,
+ "total": 1.26412834703
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.02014004901,
+ "total": 1.02014004901
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.851077291428,
+ "total": 0.851077291428
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.852998459018,
+ "total": 0.852998459018
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.934648086325,
+ "total": 0.934648086325
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.834747365851,
+ "total": 0.834747365851
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.811693353419,
+ "total": 0.811693353419
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.812653937311,
+ "total": 0.812653937311
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.887579477762,
+ "total": 0.887579477762
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.04127289365,
+ "total": 1.04127289365
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.07873566388,
+ "total": 1.07873566388
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.02494296827,
+ "total": 1.02494296827
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.994204285092,
+ "total": 0.994204285092
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.928884583169,
+ "total": 0.928884583169
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.771348831711,
+ "total": 0.771348831711
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.558099217053,
+ "total": 0.558099217053
+ }
+ ],
+ "date": "2020-06-23"
+ },
+ "2020-06-24": {
+ "hours": [
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.553689263963,
+ "total": 0.553689263963
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.341536848292,
+ "total": 0.341536848292
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.303482603437,
+ "total": 0.303482603437
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.275893275817,
+ "total": 0.275893275817
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.315850233,
+ "total": 0.315850233
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.517537731246,
+ "total": 0.517537731246
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.975140026621,
+ "total": 0.975140026621
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.24437380948,
+ "total": 1.24437380948
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.2519846586,
+ "total": 1.2519846586
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.01034020317,
+ "total": 1.01034020317
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.842901525507,
+ "total": 0.842901525507
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.844804237645,
+ "total": 0.844804237645
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.925669508176,
+ "total": 0.925669508176
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.826728471287,
+ "total": 0.826728471287
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.803895924298,
+ "total": 0.803895924298
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.804847280462,
+ "total": 0.804847280462
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.879053058224,
+ "total": 0.879053058224
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.03127003783,
+ "total": 1.03127003783
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.06837292672,
+ "total": 1.06837292672
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.0150969838,
+ "total": 1.0150969838
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.984653587882,
+ "total": 0.984653587882
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.919961371381,
+ "total": 0.919961371381
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.763938967114,
+ "total": 0.763938967114
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.552737907798,
+ "total": 0.552737907798
+ }
+ ],
+ "date": "2020-06-24"
+ },
+ "2020-06-25": {
+ "hours": [
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.546877851205,
+ "total": 0.546877851205
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.337335306747,
+ "total": 0.337335306747
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.299749200224,
+ "total": 0.299749200224
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.272499272897,
+ "total": 0.272499272897
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.31196468483,
+ "total": 0.31196468483
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.511171049906,
+ "total": 0.511171049906
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.963143981818,
+ "total": 0.963143981818
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.22906568597,
+ "total": 1.22906568597
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.23658290742,
+ "total": 1.23658290742
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.997911130412,
+ "total": 0.997911130412
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.832532261413,
+ "total": 0.832532261413
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.834411566636,
+ "total": 0.834411566636
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.914282043207,
+ "total": 0.914282043207
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.816558165986,
+ "total": 0.816558165986
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.794006501997,
+ "total": 0.794006501997
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.794946154703,
+ "total": 0.794946154703
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.868239062713,
+ "total": 0.868239062713
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.01858348899,
+ "total": 1.01858348899
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.05522994299,
+ "total": 1.05522994299
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.00260939375,
+ "total": 1.00260939375
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.972540508495,
+ "total": 0.972540508495
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.908644127163,
+ "total": 0.908644127163
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.754541090064,
+ "total": 0.754541090064
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.5459381985,
+ "total": 0.5459381985
+ }
+ ],
+ "date": "2020-06-25"
+ },
+ "2020-06-26": {
+ "hours": [
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.52068010983,
+ "total": 0.52068010983
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.321175531573,
+ "total": 0.321175531573
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.2853899571,
+ "total": 0.2853899571
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.259445415513,
+ "total": 0.259445415513
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.29702026879,
+ "total": 0.29702026879
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.486683813982,
+ "total": 0.486683813982
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.917005347958,
+ "total": 0.917005347958
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.17018828784,
+ "total": 1.17018828784
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.17734540287,
+ "total": 1.17734540287
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.950107004404,
+ "total": 0.950107004404
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.792650476435,
+ "total": 0.792650476435
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.79443975506,
+ "total": 0.79443975506
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.870484101017,
+ "total": 0.870484101017
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.777441607136,
+ "total": 0.777441607136
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.755970262381,
+ "total": 0.755970262381
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.756864901783,
+ "total": 0.756864901783
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.826646772283,
+ "total": 0.826646772283
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.969789070355,
+ "total": 0.969789070355
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.00468000561,
+ "total": 1.00468000561
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.954580201235,
+ "total": 0.954580201235
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.925951741621,
+ "total": 0.925951741621
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.865116264784,
+ "total": 0.865116264784
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.718395409103,
+ "total": 0.718395409103
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.519785470428,
+ "total": 0.519785470428
+ }
+ ],
+ "date": "2020-06-26"
+ },
+ "2020-06-27": {
+ "hours": [
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.48334832837,
+ "total": 0.48334832837
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.298147851951,
+ "total": 0.298147851951
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.264928035648,
+ "total": 0.264928035648
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.24084366874,
+ "total": 0.24084366874
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.275724475934,
+ "total": 0.275724475934
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.451789502791,
+ "total": 0.451789502791
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.851257794708,
+ "total": 0.851257794708
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.0862879955,
+ "total": 1.0862879955
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.09293195889,
+ "total": 1.09293195889
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.881986124843,
+ "total": 0.881986124843
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.735818932842,
+ "total": 0.735818932842
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.737479923565,
+ "total": 0.737479923565
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.808072033397,
+ "total": 0.808072033397
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.721700510776,
+ "total": 0.721700510776
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.701768620927,
+ "total": 0.701768620927
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.702599116372,
+ "total": 0.702599116372
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.767377758422,
+ "total": 0.767377758422
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.900257023801,
+ "total": 0.900257023801
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.932646344826,
+ "total": 0.932646344826
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.886138601902,
+ "total": 0.886138601902
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.859562748826,
+ "total": 0.859562748826
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.803089060894,
+ "total": 0.803089060894
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.666887813734,
+ "total": 0.666887813734
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.482517832925,
+ "total": 0.482517832925
+ }
+ ],
+ "date": "2020-06-27"
+ },
+ "2020-06-28": {
+ "hours": [
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.516619459917,
+ "total": 0.516619459917
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.318670766421,
+ "total": 0.318670766421
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.283164274416,
+ "total": 0.283164274416
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.257422067618,
+ "total": 0.257422067618
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.294703884304,
+ "total": 0.294703884304
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.482888292414,
+ "total": 0.482888292414
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.90985385971,
+ "total": 0.90985385971
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.16106229112,
+ "total": 1.16106229112
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.16816358967,
+ "total": 1.16816358967
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.942697364873,
+ "total": 0.942697364873
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.786468799764,
+ "total": 0.786468799764
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.788244124266,
+ "total": 0.788244124266
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.863695419978,
+ "total": 0.863695419978
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.771378540515,
+ "total": 0.771378540515
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.75007464524,
+ "total": 0.75007464524
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.75096230758,
+ "total": 0.75096230758
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.820199967267,
+ "total": 0.820199967267
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.962225935467,
+ "total": 0.962225935467
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.99684476531,
+ "total": 0.99684476531
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.947135676396,
+ "total": 0.947135676396
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.918730482756,
+ "total": 0.918730482756
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.858369446115,
+ "total": 0.858369446115
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.712792828554,
+ "total": 0.712792828554
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.515731797577,
+ "total": 0.515731797577
+ }
+ ],
+ "date": "2020-06-28"
+ },
+ "2020-06-29": {
+ "hours": [
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.484789204146,
+ "total": 0.484789204146
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.299036639585,
+ "total": 0.299036639585
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.265717794019,
+ "total": 0.265717794019
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.241561630896,
+ "total": 0.241561630896
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.276546418816,
+ "total": 0.276546418816
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.453136300767,
+ "total": 0.453136300767
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.85379541957,
+ "total": 0.85379541957
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.0895262524,
+ "total": 1.0895262524
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.09619002164,
+ "total": 1.09619002164
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.884615351773,
+ "total": 0.884615351773
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.738012431016,
+ "total": 0.738012431016
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.739678373202,
+ "total": 0.739678373202
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.810480920218,
+ "total": 0.810480920218
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.723851921512,
+ "total": 0.723851921512
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.703860614106,
+ "total": 0.703860614106
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.704693585283,
+ "total": 0.704693585283
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.769665334395,
+ "total": 0.769665334395
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.902940716826,
+ "total": 0.902940716826
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.935426591383,
+ "total": 0.935426591383
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.88878020749,
+ "total": 0.88878020749
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.862125131004,
+ "total": 0.862125131004
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.805483093324,
+ "total": 0.805483093324
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.668875826187,
+ "total": 0.668875826187
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.483956232969,
+ "total": 0.483956232969
+ }
+ ],
+ "date": "2020-06-29"
+ },
+ "2020-06-30": {
+ "hours": [
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.494744345869,
+ "total": 0.494744345869
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.305177354151,
+ "total": 0.305177354151
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.271174306407,
+ "total": 0.271174306407
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.246522096702,
+ "total": 0.246522096702
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.282225296911,
+ "total": 0.282225296911
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.462441450418,
+ "total": 0.462441450418
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.871328100437,
+ "total": 0.871328100437
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.11189966369,
+ "total": 1.11189966369
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 1.11870027337,
+ "total": 1.11870027337
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.902780919656,
+ "total": 0.902780919656
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.753167509307,
+ "total": 0.753167509307
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.754867661601,
+ "total": 0.754867661601
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.82712413825,
+ "total": 0.82712413825
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.738716213875,
+ "total": 0.738716213875
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.71831438516,
+ "total": 0.71831438516
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.719164461392,
+ "total": 0.719164461392
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.785470404758,
+ "total": 0.785470404758
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.921482595907,
+ "total": 0.921482595907
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.95463556759,
+ "total": 0.95463556759
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.907031300646,
+ "total": 0.907031300646
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.879828862416,
+ "total": 0.879828862416
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.822023681029,
+ "total": 0.822023681029
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.682611184952,
+ "total": 0.682611184952
+ },
+ {
+ "h2": 0.0,
+ "h3": 0.0,
+ "h0": 0.0,
+ "h1": 0.493894269637,
+ "total": 0.493894269637
+ }
+ ],
+ "date": "2020-06-30"
+ }
+ },
+ "ipl": "40",
+ "si": "66bcd2720e5011e79bc8fa163e05184e",
+ "r": "7",
+ "t": "WIFI",
+ "pm": "CPC",
+ "uckey": 0
+}
+
+TABLE_NAME = 'reza_big_data_test'
+
+
+def write_data_into_table(sc):
+ hive_context = HiveContext(sc)
+ data = []
+
+ k = 0
+ for i in range(10):
+ data = []
+ for _ in range(100000):
+ k += 1
+ MDOC['uckey'] = str(k)
+ mdoc = json.dumps(MDOC, default=lambda x: x.__dict__)
+ data.append((str(k), mdoc))
+
+ df = hive_context.createDataFrame(data, ['uckey', 'ucdoc'])
+ _mode = 'overwrite'
+ if i > 0:
+ _mode = 'append'
+ df.write.option("header", "true").option("encoding", "UTF-8").mode(_mode).format('hive').saveAsTable(TABLE_NAME)
+
+
+def test_push(sc, es_write_conf):
+ hive_context = HiveContext(sc)
+
+ command = "SELECT uckey,ucdoc FROM {}".format(TABLE_NAME)
+ df = hive_context.sql(command)
+
+ def format_data(x, field_name):
+ _doc = {'uckey': x[0], field_name: json.loads(x[1])}
+ return (x[0], json.dumps(_doc))
+
+ rdd = df.rdd.map(lambda x: format_data(x, 'ucdoc'))
+
+ rdd.saveAsNewAPIHadoopFile(
+ path='-',
+ outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
+ keyClass="org.apache.hadoop.io.NullWritable",
+ valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
+ conf=es_write_conf)
+
+
+if __name__ == '__main__':
+
+ sc = SparkContext()
+
+ es_write_conf = {"es.nodes": '10.213.37.41', "es.port": '9200',
+ "es.resource": 'reza_spark_es_test/doc',
+ "es.batch.size.bytes": "1000000",
+ "es.batch.size.entries": "1000",
+ "es.input.json": "yes", "es.mapping.id": "uckey",
+ "es.nodes.wan.only": "true", "es.write.operation": "upsert"}
+
+ # write_data_into_table(sc)
+ test_push(sc=sc, es_write_conf=es_write_conf)
diff --git a/Processes/dlpredictor/experiments/scripts/test_spark_es_write.py b/Processes/dlpredictor/experiments/scripts/test_spark_es_write.py
new file mode 100644
index 0000000..4f03e8a
--- /dev/null
+++ b/Processes/dlpredictor/experiments/scripts/test_spark_es_write.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0.html
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This file is to test pushing Spark data frame into Elasticsearch.
+Each row in dataframe is a document in Elasticsearch.
+"""
+# Reza
+
+import os
+import json
+
+from pyspark import SparkContext, SparkConf
+from pyspark.sql import SQLContext
+
+
+def test_push(sc, es_write_conf):
+
+ data = [{"days": {'key13': 'value13'}, 'doc_id': "1230112"},
+ {"days": {'key22': 'value22'}, 'doc_id': "4560112"}]
+ rdd = sc.parallelize(data)
+
+ def format_data(x):
+ return (x['doc_id'], json.dumps(x))
+
+ rdd = rdd.map(lambda x: format_data(x))
+
+ rdd.saveAsNewAPIHadoopFile(
+ path='-',
+ outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
+ keyClass="org.apache.hadoop.io.NullWritable",
+ valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
+ conf=es_write_conf)
+
+
+if __name__ == '__main__':
+
+ os.environ[
+ 'PYSPARK_SUBMIT_ARGS'] = '--jars ~/elasticsearch-hadoop-6.5.2/dist/elasticsearch-hadoop-6.5.2.jar pyspark-shell'
+
+ sc = SparkContext()
+
+ es_write_conf = {"es.nodes": '10.124.243.233', "es.port": '9200', "es.resource": 'tbr_spark_es_test_02082019/doc',
+ "es.batch.size.entries": '5', "es.input.json": "yes", "es.mapping.id": "doc_id",
+ "es.nodes.wan.only": "true", "es.write.operation": "upsert"}
+ test_push(sc=sc, es_write_conf=es_write_conf)
diff --git a/Processes/dlpredictor/lib/predictor_dl_model-1.6.0-py2.7.egg b/Processes/dlpredictor/lib/predictor_dl_model-1.6.0-py2.7.egg
index e7f0366..c5791cc 100644
Binary files a/Processes/dlpredictor/lib/predictor_dl_model-1.6.0-py2.7.egg and b/Processes/dlpredictor/lib/predictor_dl_model-1.6.0-py2.7.egg differ
diff --git a/Processes/dlpredictor/run.sh b/Processes/dlpredictor/run.sh
index 004b52e..692239e 100644
--- a/Processes/dlpredictor/run.sh
+++ b/Processes/dlpredictor/run.sh
@@ -5,26 +5,17 @@ SCRIPTPATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
# Save the configuration
if true
then
- spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --py-files $SCRIPTPATH/dist/dlpredictor-1.6.0-py2.7.egg $SCRIPTPATH/dlpredictor/show_config.py $SCRIPTPATH/conf/config.yml
-fi
-
-# Build ipl_dist_map table AND unique_origianl_uckey
-if true
-then
- spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 32G --driver-memory 32G --py-files dist/dlpredictor-1.6.0-py2.7.egg,lib/imscommon-2.0.0-py2.7.egg,lib/predictor_dl_model-1.6.0-py2.7.egg --conf spark.driver.maxResultSize=5G dlpredictor/main_build_ipl_dist.py conf/config.yml
+ spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --py-files dist/dlpredictor-1.6.0-py2.7.egg dlpredictor/show_config.py conf/config.yml
fi
# Start the predictor
if true
then
- # spark-submit --num-executors 10 --executor-cores 5 --jars lib/elasticsearch-hadoop-6.5.2.jar dlpredictor/main_spark_es.py conf/config.yml '2019-11-03' 's32' '1' 'http://10.193.217.105:8501/v1/models/faezeh:predict'
- # spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 32G --driver-memory 32G --py-files dist/dlpredictor-1.6.0-py2.7.egg,lib/imscommon-2.0.0-py2.7.egg,lib/predictor_dl_model-1.6.0-py2.7.egg --conf spark.driver.maxResultSize=5G dlpredictor/main_spark_es.py conf/config.yml
- spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 32G --driver-memory 32G --py-files $SCRIPTPATH/dist/dlpredictor-1.6.0-py2.7.egg,$SCRIPTPATH/lib/imscommon-2.0.0-py2.7.egg,$SCRIPTPATH/lib/predictor_dl_model-1.6.0-py2.7.egg --jars $SCRIPTPATH/lib/elasticsearch-hadoop-6.8.0.jar $SCRIPTPATH/dlpredictor/main_spark_es.py $SCRIPTPATH/conf/config.yml
+ spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 32G --driver-memory 32G --py-files dist/dlpredictor-1.6.0-py2.7.egg,lib/imscommon-2.0.0-py2.7.egg,lib/predictor_dl_model-1.6.0-py2.7.egg --jars lib/elasticsearch-hadoop-6.8.0.jar dlpredictor/main_spark_es.py conf/config.yml
fi
# Push the data to elasticsearch
if true
then
- # spark-submit --master yarn --num-executors 3 --executor-cores 3 --executor-memory 16G --driver-memory 16G --py-files dist/dlpredictor-1.6.0-py2.7.egg,lib/imscommon-2.0.0-py2.7.egg --jars lib/elasticsearch-hadoop-6.8.0.jar dlpredictor/main_es_push.py conf/config.yml
- spark-submit --master yarn --num-executors 3 --executor-cores 3 --executor-memory 16G --driver-memory 16G --py-files $SCRIPTPATH/dist/dlpredictor-1.6.0-py2.7.egg,$SCRIPTPATH/lib/imscommon-2.0.0-py2.7.egg --jars $SCRIPTPATH/lib/elasticsearch-hadoop-6.8.0.jar $SCRIPTPATH/dlpredictor/main_es_push.py $SCRIPTPATH/conf/config.yml
+ spark-submit --master yarn --num-executors 3 --executor-cores 3 --executor-memory 16G --driver-memory 16G --py-files dist/dlpredictor-1.6.0-py2.7.egg,lib/imscommon-2.0.0-py2.7.egg --jars lib/elasticsearch-hadoop-6.8.0.jar dlpredictor/main_es_push.py conf/config.yml
fi
diff --git a/Processes/dlpredictor/tests/si_traffic_prediction_ckeck/si_traffic_prediction_check_by_agg.py b/Processes/dlpredictor/tests/si_traffic_prediction_ckeck/si_traffic_prediction_check_by_agg.py
index 0733f94..752ab5a 100644
--- a/Processes/dlpredictor/tests/si_traffic_prediction_ckeck/si_traffic_prediction_check_by_agg.py
+++ b/Processes/dlpredictor/tests/si_traffic_prediction_ckeck/si_traffic_prediction_check_by_agg.py
@@ -334,12 +334,12 @@ if __name__ == "__main__":
cfg = {
'log_level': 'WARN',
- 'pre_cluster_table': 'dlpm_10052021_1400_tmp_pre_cluster',
- 'dist_table': 'dlpm_10052021_1400_tmp_distribution',
+ 'pre_cluster_table': 'dlpm_110221_no_residency_no_mapping_tmp_pre_cluster',
+ 'dist_table': 'dlpm_110221_no_residency_no_mapping_tmp_distribution',
'uckey_attrs': ['m', 'si', 't', 'g', 'a', 'pm', 'r', 'ipl'],
'es_host': '10.213.37.41',
'es_port': '9200',
- 'es_predictions_index': 'dlpredictor_10142021_1400_predictions',
+ 'es_predictions_index': 'dlpredictor_110221_no_residency_no_mapping_predictions',
'es_predictions_type': 'doc',
'report_table': 'si_only_traffic_prediction_check_v3'
}
@@ -347,38 +347,37 @@ if __name__ == "__main__":
_agg = {}
# list of last days in dataset, use model-stat to get the days
- target_days = sorted(["2021-07-27", "2021-07-28", "2021-07-29", "2021-07-30", "2021-07-31"])
+ target_days = sorted(["2021-07-12", "2021-07-13", "2021-07-14", "2021-07-15", "2021-07-16","2021-07-17", "2021-07-18", "2021-07-19", "2021-07-20", "2021-07-21"])
- VERSION = '10142021-1700'
+ VERSION = '110221_no_residency_no_mapping_2'
traffic = {'si': '', 'version': VERSION}
sis = [
- '66bcd2720e5011e79bc8fa163e05184e',
- '7b0d7b55ab0c11e68b7900163e3e481d',
- 'a8syykhszz',
- 'w3wx3nv9ow5i97',
- 'x2fpfbm8rt',
- '17dd6d8098bf11e5bdec00163e291137',
- '5cd1c663263511e6af7500163e291137',
- '68bcd2720e5011e79bc8fa163e05184e',
- '71bcd2720e5011e79bc8fa163e05184e',
- 'a290af82884e11e5bdec00163e291137',
'a47eavw7ex',
- 'b6le0s4qo8',
- 'd4d7362e879511e5bdec00163e291137',
- 'd971z9825e',
- 'd9jucwkpr3',
- 'e351de37263311e6af7500163e291137',
- 'f1iprgyl13',
- 'j1430itab9wj3b',
- 'k4werqx13k',
- 'l03493p0r3',
- 'l2d4ec6csv',
- 'p7gsrebd4m',
- 's4z85pd1h8',
- 'w9fmyd5r0i',
- 'x0ej5xhk60kjwq',
- 'z041bf6g4s']
+'66bcd2720e5011e79bc8fa163e05184e',
+'x0ej5xhk60kjwq',
+'l03493p0r3',
+'7b0d7b55ab0c11e68b7900163e3e481d',
+'b6le0s4qo8',
+'e351de37263311e6af7500163e291137',
+'a290af82884e11e5bdec00163e291137',
+'68bcd2720e5011e79bc8fa163e05184e',
+'f1iprgyl13',
+'w9fmyd5r0i',
+'w3wx3nv9ow5i97',
+'d971z9825e',
+'l2d4ec6csv',
+'z041bf6g4s',
+'71bcd2720e5011e79bc8fa163e05184e',
+'5cd1c663263511e6af7500163e291137',
+'x2fpfbm8rt',
+'d9jucwkpr3',
+'k4werqx13k',
+'j1430itab9wj3b',
+'a8syykhszz',
+'s4z85pd1h8',
+'17dd6d8098bf11e5bdec00163e291137',
+'d4d7362e879511e5bdec00163e291137']
sc = SparkContext.getOrCreate()
hive_context = HiveContext(sc)