You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bluemarlin.apache.org by xu...@apache.org on 2021/09/13 22:05:04 UTC
[incubator-bluemarlin] branch main updated: Update main_outlier.py
This is an automated email from the ASF dual-hosted git repository.
xunh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git
The following commit(s) were added to refs/heads/main by this push:
new 554b52f Update main_outlier.py
new 2f14296 Merge pull request #12 from radibnia77/main
554b52f is described below
commit 554b52f6b795dcf4cab7753fb00f4c29a67e0b71
Author: Reza <re...@yahoo.com>
AuthorDate: Mon Sep 13 14:07:37 2021 -0700
Update main_outlier.py
Fix range checking
Fix type conversion
---
.../predictor_dl_model/pipeline/main_outlier.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_outlier.py b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_outlier.py
index 2cffd52..f542016 100644
--- a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_outlier.py
+++ b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_outlier.py
@@ -47,10 +47,10 @@ def run(hive_context, input_table_name, outlier_table):
df_result = df_result.na.fill(value=0)
ts_l = df_result.groupBy().sum().collect()[0]
ts_l = pd.Series(list(ts_l))
- outlier_indice = hampel(ts_l, window_size=5, n=6)
+ outlier_indices = hampel(ts_l, window_size=5, n=6)
def _filter_outlier(x, ind_list):
- for i in range(len(x)):
+ for i in range(1, len(x)-1):
if i in ind_list and x[i] != None and x[i + 1] != None and x[i - 1] != None:
x[i] = (x[i - 1] + x[i + 1]) / 2
return x
@@ -59,8 +59,8 @@ def run(hive_context, input_table_name, outlier_table):
SELECT * FROM {}
""".format(input_table_name)
df = hive_context.sql(command)
- df = df.withColumn("indice", array([fn.lit(x) for x in outlier_indice]))
- df = df.withColumn('ts', udf(_filter_outlier, ArrayType(IntegerType()))(df['ts'], df['indice']))
+ df = df.withColumn("indices", array([fn.lit(int(x)) for x in outlier_indices]))
+ df = df.withColumn('ts', udf(_filter_outlier, ArrayType(IntegerType()))(df['ts'], df['indices']))
write_to_table(df, outlier_table, mode='overwrite')