You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bluemarlin.apache.org by xu...@apache.org on 2021/09/13 22:05:04 UTC

[incubator-bluemarlin] branch main updated: Update main_outlier.py

This is an automated email from the ASF dual-hosted git repository.

xunh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git


The following commit(s) were added to refs/heads/main by this push:
     new 554b52f  Update main_outlier.py
     new 2f14296  Merge pull request #12 from radibnia77/main
554b52f is described below

commit 554b52f6b795dcf4cab7753fb00f4c29a67e0b71
Author: Reza <re...@yahoo.com>
AuthorDate: Mon Sep 13 14:07:37 2021 -0700

    Update main_outlier.py
    
    Fix range checking
    Fix type conversion
---
 .../predictor_dl_model/pipeline/main_outlier.py                   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_outlier.py b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_outlier.py
index 2cffd52..f542016 100644
--- a/Model/predictor-dl-model/predictor_dl_model/pipeline/main_outlier.py
+++ b/Model/predictor-dl-model/predictor_dl_model/pipeline/main_outlier.py
@@ -47,10 +47,10 @@ def run(hive_context, input_table_name, outlier_table):
     df_result = df_result.na.fill(value=0)
     ts_l = df_result.groupBy().sum().collect()[0]
     ts_l = pd.Series(list(ts_l))
-    outlier_indice = hampel(ts_l, window_size=5, n=6)
+    outlier_indices = hampel(ts_l, window_size=5, n=6)
 
     def _filter_outlier(x, ind_list):
-        for i in range(len(x)):
+        for i in range(1, len(x)-1):
             if i in ind_list and x[i] != None and x[i + 1] != None and x[i - 1] != None:
                 x[i] = (x[i - 1] + x[i + 1]) / 2
         return x
@@ -59,8 +59,8 @@ def run(hive_context, input_table_name, outlier_table):
         SELECT * FROM {}
         """.format(input_table_name)
     df = hive_context.sql(command)
-    df = df.withColumn("indice", array([fn.lit(x) for x in outlier_indice]))
-    df = df.withColumn('ts', udf(_filter_outlier, ArrayType(IntegerType()))(df['ts'], df['indice']))
+    df = df.withColumn("indices", array([fn.lit(int(x)) for x in outlier_indices]))
+    df = df.withColumn('ts', udf(_filter_outlier, ArrayType(IntegerType()))(df['ts'], df['indices']))
     write_to_table(df, outlier_table, mode='overwrite')