You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bluemarlin.apache.org by ra...@apache.org on 2022/02/02 18:55:36 UTC

[incubator-bluemarlin] branch main updated: update lookalike application

This is an automated email from the ASF dual-hosted git repository.

radibnia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git


The following commit(s) were added to refs/heads/main by this push:
     new eff880d  update lookalike application
     new 402e355  Merge pull request #41 from radibnia77/main
eff880d is described below

commit eff880de5cacea28d3cdbb596990054ec60531dd
Author: Reza <re...@yahoo.com>
AuthorDate: Wed Feb 2 10:53:25 2022 -0800

    update lookalike application
---
 .../application/pipeline/config.yml                | 32 ++++----
 .../lookalike_model/application/pipeline/run.sh    |  2 +
 .../application/pipeline/score_generator.py        | 59 +++++++--------
 ...vector_table.py => score_vector_rebucketing.py} | 62 +++++++--------
 .../application/pipeline/score_vector_table.py     | 20 ++---
 .../application/pipeline/top_n_similarity.py       | 88 +++++++++++-----------
 6 files changed, 130 insertions(+), 133 deletions(-)

diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/config.yml b/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
index 50478a9..92e7c58 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
@@ -1,12 +1,10 @@
 product_tag: 'lookalike_application'
-pipeline_tag: '08192021_1m'
+pipeline_tag: '12132021'
 score_generator:
     input:
-        log_table : "lookalike_08172021_1000_logs"
-        did_table: "lookalike_08172021_1000_trainready"
-        keywords_table: "din_ad_keywords_09172020"
-        significant_keywords_table: "lookalike_08172021_1000_keywords"
-        din_model_tf_serving_url: "http://10.193.217.105:8506/v1/models/lookalike:predict"
+        aid_table: "lookalike_11192021_trainready"
+        keywords_table: "lookalike_11192021_keywords"
+        din_model_tf_serving_url: "http://10.193.217.126:8501/v1/models/lookalike_1119:predict"
         din_model_length: 20
         extend: 2000
         alg: "euclidean" ##### currently just support "euclideand" and "dot"
@@ -14,16 +12,22 @@ score_generator:
         score_table:  "{product_tag}_{pipeline_tag}_score"
     normalize: False       
 score_vector:
-    keywords_table: "din_ad_keywords_09172020"
+    keywords_table: "lookalike_11192021_keywords"
     score_table:  "{product_tag}_{pipeline_tag}_score"
     score_vector_table: "{product_tag}_{pipeline_tag}_score_vector"
-    did_bucket_size: 100
-    did_bucket_step: 10
+    aid_bucket_size: 100
+    aid_bucket_step: 10
+# Adds alpha_aid_bucket partition to allow for finer control of step size in top_n_similarity stage
+score_vector_rebucketing:
+    aid_bucket_size: 100
+    aid_bucket_step: 10
+    alpha_aid_bucket_size: 10  # The number of buckets to allocate for the alpha_aid_bucket column
+    score_vector_alpha_table: "{product_tag}_{pipeline_tag}_score_vector_alpha"
 top_n_similarity:
-    did_bucket_size: 100
-    load_bucket_step: 20
-    search_bucket_step: 50
-    top_n: 10
-    index_factory_string: "IVF256,Flat"
+    aid_bucket_size: 100       # Total number of alpha buckets to process similarity of
+    load_bucket_step: 20       # Number of alpha buckets to load into index at a time
+    search_bucket_step: 50     # Number of alpha buckets to process top N at a time
+    top_n: 10                  # Number of nearest neighbors to store for each aid
+    index_factory_string: "IVF256,Flat"  # See https://github.com/facebookresearch/faiss/wiki/The-index-factory for factory strings
     similarity_table: "{product_tag}_{pipeline_tag}_similarity"
     
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/run.sh b/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
index 6af8ce1..10151eb 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
@@ -4,6 +4,8 @@ spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memo
 
 spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml
 
+spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.yml
+
 # Run top_n_similarity.py on a machine with a GPU.
 #spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity.py config.yml
 #spark-submit --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity.py config.yml
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py b/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
index c57b4b3..550d959 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
@@ -30,10 +30,10 @@ from util import resolve_placeholder, write_to_table_with_partition
 '''
 This process generates the score-table with the following format.
 
-DataFrame[age: int, gender: int, did: string, did_index: bigint, 
+DataFrame[age: int, gender: int, aid: string, aid_index: bigint, 
 interval_starting_time: array<string>, interval_keywords: array<string>, 
 kwi: array<string>, kwi_show_counts: array<string>, kwi_click_counts: array<string>, 
-did_bucket: string, kws: map<string,float>, kws_norm: map<string,float>]
+aid_bucket: string, kws: map<string,float>, kws_norm: map<string,float>]
 
 '''
 
@@ -57,11 +57,11 @@ def str_to_intlist(table):
 def input_data(record, keyword, length):
     if len(record['show_counts']) >= length:
         hist = flatten(record['show_counts'][:length])
-        instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
+        instance = {'hist_i': hist, 'u': record['aid'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
     else:
         hist = flatten(record['show_counts'])
         # [hist.extend([0]) for i in range(length - len(hist))]
-        instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
+        instance = {'hist_i': hist, 'u': record['aid'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
     return instance
 
 
@@ -112,12 +112,12 @@ def normalize(x):
 
 
 class CTRScoreGenerator:
-    def __init__(self, df_did, df_keywords, din_model_tf_serving_url, din_model_length):
-        self.df_did = df_did
+    def __init__(self, df_aid, df_keywords, din_model_tf_serving_url, din_model_length):
+        self.df_aid = df_aid
         self.df_keywords = df_keywords
         self.din_model_tf_serving_url = din_model_tf_serving_url
         self.din_model_length = din_model_length
-        self.df_did_loaded = None
+        self.df_aid_loaded = None
         self.keyword_index_list, self.keyword_list = self.get_keywords()
 
     def get_keywords(self):
@@ -131,33 +131,35 @@ class CTRScoreGenerator:
     def run(self):
 
         def predict_udf(din_model_length, din_model_tf_serving_url, keyword_index_list, keyword_list):
-            def __helper(did, kwi_show_counts, age, gender):
+            def __helper(aid, kwi_show_counts, age, gender):
                 kwi_show_counts = str_to_intlist(kwi_show_counts)
-                record = {'did': did,
+                record = {'aid': aid,
                           'show_counts': kwi_show_counts,
                           'a': str(age),
                           'g': str(gender)}
 
                 response = predict(serving_url=din_model_tf_serving_url, record=record,
                                    length=din_model_length, new_keyword=keyword_index_list)
+                # If the response is a string, there was an error.
+                assert not isinstance(response, str), 'Error occurred when retrieving keyword scores from URL.'
 
-                did_kw_scores = dict()
+                aid_kw_scores = dict()
                 for i in range(len(response)):
                     keyword = keyword_list[i]
                     keyword_score = response[i][0]
-                    did_kw_scores[keyword] = keyword_score
+                    aid_kw_scores[keyword] = keyword_score
 
-                return did_kw_scores
+                return aid_kw_scores
 
             return __helper
 
-        self.df_did_loaded = self.df_did.withColumn('kws',
+        self.df_aid_loaded = self.df_aid.withColumn('kws',
                                                     udf(predict_udf(din_model_length=self.din_model_length,
                                                                     din_model_tf_serving_url=self.din_model_tf_serving_url,
                                                                     keyword_index_list=self.keyword_index_list,
                                                                     keyword_list=self.keyword_list),
                                                         MapType(StringType(), FloatType()))
-                                                    (col('did_index'), col('kwi_show_counts'), col('age'), col('gender')))
+                                                    (col('aid_index'), col('kwi_show_counts'), col('age'), col('gender_index')))
 
 
 if __name__ == '__main__':
@@ -173,31 +175,28 @@ if __name__ == '__main__':
     hive_context = HiveContext(sc)
 
     # load dataframes
-    did_table, keywords_table, significant_keywords_table, din_tf_serving_url, length = cfg['score_generator']['input']['did_table'], cfg['score_generator']['input'][
-        'keywords_table'], cfg['score_generator']['input'][
-        'significant_keywords_table'], cfg['score_generator']['input']['din_model_tf_serving_url'], cfg['score_generator']['input']['din_model_length']
+    aid_table = cfg['score_generator']['input']['aid_table']
+    keywords_table = cfg['score_generator']['input']['keywords_table']
+    din_tf_serving_url = cfg['score_generator']['input']['din_model_tf_serving_url']
+    length = cfg['score_generator']['input']['din_model_length']
 
-    command = 'SELECT * FROM {}'
-    df_did = hive_context.sql(command.format(did_table))
+    command = 'SELECT * FROM {}'.format(aid_table)
+    df_aid = hive_context.sql(command)
 
-    command = 'SELECT T1.keyword,T1.spread_app_id,T1.keyword_index FROM {} AS T1 JOIN {} AS T2 ON T1.keyword=T2.keyword'
-    df_keywords = hive_context.sql(command.format(keywords_table, significant_keywords_table))
-    # temporary adding to filter based on active keywords
-    df_keywords = df_keywords.filter((df_keywords.keyword == 'video') | (df_keywords.keyword == 'shopping') | (df_keywords.keyword == 'info') |
-                                     (df_keywords.keyword == 'social') | (df_keywords.keyword == 'reading') | (df_keywords.keyword == 'travel') |
-                                     (df_keywords.keyword == 'entertainment'))
+    command = 'SELECT keyword, keyword_index FROM {}'.format(keywords_table)
+    df_keywords = hive_context.sql(command)
 
     score_table = cfg['score_generator']['output']['score_table']
 
-    # create a CTR score generator instance and run to get the loaded did
-    ctr_score_generator = CTRScoreGenerator(df_did, df_keywords, din_tf_serving_url, length)
+    # create a CTR score generator instance and run to get the loaded aid
+    ctr_score_generator = CTRScoreGenerator(df_aid, df_keywords, din_tf_serving_url, length)
     ctr_score_generator.run()
-    df = ctr_score_generator.df_did_loaded
+    df = ctr_score_generator.df_aid_loaded
 
     # normalization is required
     udf_normalize = udf(normalize, MapType(StringType(), FloatType()))
     if cfg['score_generator']['normalize']:
         df = df.withColumn('kws_norm', udf_normalize(col('kws')))
 
-    # save the loaded did to hive table
-    write_to_table_with_partition(df, score_table, partition=('did_bucket'), mode='overwrite')
+    # save the loaded aid to hive table
+    write_to_table_with_partition(df, score_table, partition=('aid_bucket'), mode='overwrite')
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
similarity index 54%
copy from Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
copy to Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
index 9c33903..143e43d 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
@@ -19,7 +19,7 @@ import argparse
 from pyspark import SparkContext
 from pyspark.sql import HiveContext
 from pyspark.sql.functions import lit, col, udf
-from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType
+from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType, IntegerType
 # from rest_client import predict, str_to_intlist
 import requests
 import json
@@ -27,77 +27,67 @@ import argparse
 from pyspark.sql.functions import udf
 from math import sqrt
 import time
-import numpy as np
-
-from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition
+import hashlib
 from util import resolve_placeholder
 
 
+from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition
+
 '''
 
 To run, execute the following in application folder.
-spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml
+spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.yml
 
-This process generates the score_vector_table table.
+This process generates added secondary buckects ids (alpha-aid-bucket).
 
-The top-n-similarity table is 
+'''
 
-|user| score-vector | did-bucket
-|:-------------| :------------: |
-|user-1-did| [similarity-score-11, similarity-score-12, similarity-score-13] | 1
-|user-2-did| [similarity-score-21, similarity-score-22, similarity-score-23] | 1
-|user-3-did| [similarity-score-31, similarity-score-32, similarity-score-33] | 2
 
-'''
+def assign_new_bucket_id(df, n, new_column_name):
+    def __hash_sha256(s):
+        hex_value = hashlib.sha256(s.encode('utf-8')).hexdigest()
+        return int(hex_value, 16)
+    _udf = udf(lambda x: __hash_sha256(x) % n, IntegerType())
+    df = df.withColumn(new_column_name, _udf(df.aid))
+    return df
 
 
 def run(hive_context, cfg):
 
-    keywords_table = cfg["score_vector"]["keywords_table"]
-    score_table = cfg['score_vector']['score_table']
     score_vector_table = cfg['score_vector']['score_vector_table']
-    bucket_size = cfg['score_vector']['did_bucket_size']
-    bucket_step = cfg['score_vector']['did_bucket_step']
-
-    # get kw list
-    keywords = hive_context.sql("SELECT DISTINCT(keyword) FROM {}".format(keywords_table)).collect()
-    keywords = [_['keyword'] for _ in keywords]
-    keywords = sorted(keywords)
+    bucket_size = cfg['score_vector_rebucketing']['aid_bucket_size']
+    bucket_step = cfg['score_vector_rebucketing']['aid_bucket_step']
+    alpha_bucket_size = cfg['score_vector_rebucketing']['alpha_aid_bucket_size']
+    score_vector_alpha_table = cfg['score_vector_rebucketing']['score_vector_alpha_table']
 
-    # add score-vector iterativly
     first_round = True
     num_batches = (bucket_size + bucket_step - 1) / bucket_step
     batch_num = 1
-    for did_bucket in range(0, bucket_size, bucket_step):
-        print('Processing batch {} of {}   bucket number: {}'.format(batch_num, num_batches, did_bucket))
+    for aid_bucket in range(0, bucket_size, bucket_step):
+        print('Processing batch {} of {}   bucket number: {}'.format(batch_num, num_batches, aid_bucket))
 
-        command = "SELECT did, did_bucket, kws FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_table, did_bucket, min(did_bucket+bucket_step-1, bucket_size))
+        command = "SELECT aid, aid_bucket, score_vector, c1 FROM {} WHERE aid_bucket BETWEEN {} AND {}".format(score_vector_table, aid_bucket, min(aid_bucket+bucket_step-1, bucket_size))
 
-        # |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0         |
-        # [social -> 0.24231663, entertainment -> 0.20828941, reading -> 0.44120282, video -> 0.34497723, travel -> 0.3453492, shopping -> 0.5347804, info -> 0.1978679]|
         df = hive_context.sql(command)
-        df = df.withColumn("score_vector",
-                           udf(lambda kws: [kws[keyword] if keyword in kws else 0.0 for keyword in keywords], ArrayType(FloatType()))(df.kws))
-
-        df = df.withColumn('c1', udf(lambda x: float(np.array(x).dot(np.array(x))), FloatType())(df.score_vector))
+        df = assign_new_bucket_id(df, alpha_bucket_size, 'alpha_aid_bucket')
 
         mode = 'overwrite' if first_round else 'append'
-        write_to_table_with_partition(df.select('did', 'score_vector', 'c1', 'did_bucket'), score_vector_table, partition=('did_bucket'), mode=mode)
-
+        write_to_table_with_partition(df.select('aid', 'score_vector', 'c1', 'aid_bucket', 'alpha_aid_bucket'),
+                                      score_vector_alpha_table, partition=('aid_bucket', 'alpha_aid_bucket'), mode=mode)
         first_round = False
         batch_num += 1
 
 
 if __name__ == "__main__":
     start = time.time()
-    parser = argparse.ArgumentParser(description=" ")
+    parser = argparse.ArgumentParser(description='')
     parser.add_argument('config_file')
     args = parser.parse_args()
     with open(args.config_file, 'r') as yml_file:
         cfg = yaml.safe_load(yml_file)
         resolve_placeholder(cfg)
     sc = SparkContext.getOrCreate()
-    sc.setLogLevel('INFO')
+    sc.setLogLevel('WARN')
     hive_context = HiveContext(sc)
 
     run(hive_context=hive_context, cfg=cfg)
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
index 9c33903..cfbe1b8 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
@@ -42,11 +42,11 @@ This process generates the score_vector_table table.
 
 The top-n-similarity table is 
 
-|user| score-vector | did-bucket
+|user| score-vector | aid-bucket
 |:-------------| :------------: |
-|user-1-did| [similarity-score-11, similarity-score-12, similarity-score-13] | 1
-|user-2-did| [similarity-score-21, similarity-score-22, similarity-score-23] | 1
-|user-3-did| [similarity-score-31, similarity-score-32, similarity-score-33] | 2
+|user-1-aid| [similarity-score-11, similarity-score-12, similarity-score-13] | 1
+|user-2-aid| [similarity-score-21, similarity-score-22, similarity-score-23] | 1
+|user-3-aid| [similarity-score-31, similarity-score-32, similarity-score-33] | 2
 
 '''
 
@@ -56,8 +56,8 @@ def run(hive_context, cfg):
     keywords_table = cfg["score_vector"]["keywords_table"]
     score_table = cfg['score_vector']['score_table']
     score_vector_table = cfg['score_vector']['score_vector_table']
-    bucket_size = cfg['score_vector']['did_bucket_size']
-    bucket_step = cfg['score_vector']['did_bucket_step']
+    bucket_size = cfg['score_vector']['aid_bucket_size']
+    bucket_step = cfg['score_vector']['aid_bucket_step']
 
     # get kw list
     keywords = hive_context.sql("SELECT DISTINCT(keyword) FROM {}".format(keywords_table)).collect()
@@ -68,10 +68,10 @@ def run(hive_context, cfg):
     first_round = True
     num_batches = (bucket_size + bucket_step - 1) / bucket_step
     batch_num = 1
-    for did_bucket in range(0, bucket_size, bucket_step):
-        print('Processing batch {} of {}   bucket number: {}'.format(batch_num, num_batches, did_bucket))
+    for aid_bucket in range(0, bucket_size, bucket_step):
+        print('Processing batch {} of {}   bucket number: {}'.format(batch_num, num_batches, aid_bucket))
 
-        command = "SELECT did, did_bucket, kws FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_table, did_bucket, min(did_bucket+bucket_step-1, bucket_size))
+        command = "SELECT aid, aid_bucket, kws FROM {} WHERE aid_bucket BETWEEN {} AND {}".format(score_table, aid_bucket, min(aid_bucket+bucket_step-1, bucket_size))
 
         # |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0         |
         # [social -> 0.24231663, entertainment -> 0.20828941, reading -> 0.44120282, video -> 0.34497723, travel -> 0.3453492, shopping -> 0.5347804, info -> 0.1978679]|
@@ -82,7 +82,7 @@ def run(hive_context, cfg):
         df = df.withColumn('c1', udf(lambda x: float(np.array(x).dot(np.array(x))), FloatType())(df.score_vector))
 
         mode = 'overwrite' if first_round else 'append'
-        write_to_table_with_partition(df.select('did', 'score_vector', 'c1', 'did_bucket'), score_vector_table, partition=('did_bucket'), mode=mode)
+        write_to_table_with_partition(df.select('aid', 'score_vector', 'c1', 'aid_bucket'), score_vector_table, partition=('aid_bucket'), mode=mode)
 
         first_round = False
         batch_num += 1
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity.py b/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity.py
index e031f9e..8e7289b 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity.py
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity.py
@@ -39,9 +39,9 @@ The top-n-similarity table is
 
 |user| top-N-similarity|top-n-users
 |:-------------| :------------: |
-|user-1-did| [similarity-score-11, similarity-score-12, similarity-score-13] |[user-did-1, user-did-2, user-did-3]|
-|user-2-did| [similarity-score-21, similarity-score-22, similarity-score-23] |[user-did-10, user-did-20, user-did-30]|
-|user-3-did| [similarity-score-31, similarity-score-32, similarity-score-33] |[user-did-23, user-did-87, user-did-45]|
+|user-1-aid| [similarity-score-11, similarity-score-12, similarity-score-13] |[user-aid-1, user-aid-2, user-aid-3]|
+|user-2-aid| [similarity-score-21, similarity-score-22, similarity-score-23] |[user-aid-10, user-aid-20, user-aid-30]|
+|user-3-aid| [similarity-score-31, similarity-score-32, similarity-score-33] |[user-aid-23, user-aid-87, user-aid-45]|
 
 '''
 
@@ -49,27 +49,27 @@ The top-n-similarity table is
 def load_score_vectors(spark_session, score_vector_table,
                        bucket, bucket_step, bucket_size):
     # Run the query of the Hive data.
-    command = "SELECT did, did_bucket, score_vector FROM {} WHERE did_bucket BETWEEN {} AND {}".format(
+    command = "SELECT aid, aid_bucket, score_vector, alpha_aid_bucket FROM {} WHERE alpha_aid_bucket BETWEEN {} AND {}".format(
         score_vector_table, bucket, min(bucket+bucket_step-1, bucket_size))
     df = spark_session.sql(command)
 
-    # Get the dids, score vectors, and buckets as numpy arrays.
+    # Get the aids, score vectors, and buckets as numpy arrays.
 
-    # we need to collect them first and then seperate them, otherwise we cannot relate dids to scors
-    _all = df.select('did', 'score_vector', 'did_bucket').collect()
-    dids = np.array([_['did'] for _ in _all])
-    score_vectors = np.array([_['score_vector'] for _ in _all], dtype='f4')
-    buckets = np.array([_['did_bucket'] for _ in _all])
+    # We need to collect them first and then seperate them, otherwise we cannot relate aids to scores.
+    _all = df.select('aid', 'score_vector', 'aid_bucket').collect()
+    aids = np.array([i['aid'] for i in _all])
+    score_vectors = np.array([i['score_vector'] for i in _all], dtype='f4')
+    buckets = np.array([i['aid_bucket'] for i in _all])
 
-    # Return the dids and score_vectors.
-    return (dids, score_vectors, buckets)
+    # Return the aids and score_vectors.
+    return (aids, score_vectors, buckets)
 
 
 def run(spark_session, cfg):
-    score_vector_table = cfg['score_vector']['score_vector_table']
+    score_vector_table = cfg['score_vector_rebucketing']['score_vector_alpha_table']
     similarity_table = cfg['top_n_similarity']['similarity_table']
     top_n_value = cfg['top_n_similarity']['top_n']
-    did_bucket_size = cfg['top_n_similarity']['did_bucket_size']
+    aid_bucket_size = cfg['top_n_similarity']['aid_bucket_size']
     load_bucket_step = cfg['top_n_similarity']['load_bucket_step']
     search_bucket_step = cfg['top_n_similarity']['search_bucket_step']
     index_factory_string = cfg['top_n_similarity']['index_factory_string']
@@ -82,20 +82,20 @@ def run(spark_session, cfg):
     start_time = time.time()
 
     # Load the score vectors into the index.
-    did_list = []
-    for did_bucket in range(0, did_bucket_size, load_bucket_step):
-        print('Loading buckets {} - {} of {}'.format(did_bucket, did_bucket+load_bucket_step-1, did_bucket_size))
-        (dids, score_vectors, _) = load_score_vectors(spark_session, score_vector_table,
-                                                    did_bucket, load_bucket_step, did_bucket_size)
-
-        # Keep track of the dids.
-        if did_bucket == 0:
-            did_list = dids
+    aid_list = []
+    for aid_bucket in range(0, aid_bucket_size, load_bucket_step):
+        print('Loading alpha buckets {} - {} of {}'.format(aid_bucket, aid_bucket+load_bucket_step-1, aid_bucket_size))
+        (aids, score_vectors, _) = load_score_vectors(spark_session, score_vector_table,
+                                                      aid_bucket, load_bucket_step, aid_bucket_size)
+
+        # Keep track of the aids.
+        if aid_bucket == 0:
+            aid_list = aids
         else:
-            did_list = np.concatenate((did_list, dids))
+            aid_list = np.concatenate((aid_list, aids))
 
         # Create the FAISS index on the first iteration.
-        if did_bucket == 0:
+        if aid_bucket == 0:
             cpu_index = faiss.index_factory(score_vectors.shape[1], index_factory_string)
             gpu_index = faiss.index_cpu_to_all_gpus(cpu_index)
 
@@ -106,7 +106,6 @@ def run(spark_session, cfg):
         # Add the vectors to the index.
         gpu_index.add(score_vectors)
 
-
     load_time = time.time()
 
     # Find the top N by bucket step.
@@ -116,36 +115,38 @@ def run(spark_session, cfg):
     total_load_time = 0
     total_format_time = 0
     total_write_time = 0
-    for did_bucket in range(0, did_bucket_size, search_bucket_step):
-        # Search for the top N similar users for bucket.
-        (dids, score_vectors, buckets) = load_score_vectors(spark_session, score_vector_table, did_bucket, search_bucket_step, did_bucket_size)
+    for aid_bucket in range(0, aid_bucket_size, search_bucket_step):
+        print('Searching alpha buckets {} - {} of {}'.format(aid_bucket, aid_bucket+search_bucket_step-1, aid_bucket_size))
+
+        # Load the users to perform the search with.
+        print('Loading users from Hive')
+        (aids, score_vectors, buckets) = load_score_vectors(spark_session, score_vector_table, aid_bucket, search_bucket_step, aid_bucket_size)
         end_load = time.time()
         total_load_time += end_load-start_load
 
+        # Search for the top N similar users for bucket.
+        print('Performing the search')
         top_n_distances, top_n_indices = gpu_index.search(score_vectors, top_n_value)
         end_search = time.time()
         total_search_time += end_search-end_load
 
-        # start_load = end_search
-        # continue
-
-        # Get the top N dids from the top N indexes.
-        top_n_dids = did_list[top_n_indices]
+        # Get the top N aids from the top N indexes.
+        top_n_aids = aid_list[top_n_indices]
 
         # Format and write the result back to Hive.
         # Format the data for a Spark dataframe in order to write to Hive.
-        #  [ ('0000001', [{'did':'0000001', 'score':1.73205081}, {'did':'0000003', 'score':1.73205081}, {'did':'0000004', 'score':0.88532267}, {'did':'0000002', 'score':0.66903623}], 0),
-        #    ('0000002', [{'did':'0000002', 'score':1.73205081}, {'did':'0000004', 'score':1.50844401}, {'did':'0000001', 'score':0.66903623}, {'did':'0000003', 'score':0.66903623}], 0),
+        #  [ ('0000001', [{'aid':'0000001', 'score':1.73205081}, {'aid':'0000003', 'score':1.73205081}, {'aid':'0000004', 'score':0.88532267}, {'aid':'0000002', 'score':0.66903623}], 0),
+        #    ('0000002', [{'aid':'0000002', 'score':1.73205081}, {'aid':'0000004', 'score':1.50844401}, {'aid':'0000001', 'score':0.66903623}, {'aid':'0000003', 'score':0.66903623}], 0),
         #    ... ]
-        data = [(str(did), [(str(n_did), float(distance)) for n_did, distance in zip(top_did, top_distances)], int(bucket))
-                for did, top_did, top_distances, bucket in zip(dids, top_n_dids, top_n_distances, buckets)]
-
+        print('Formatting the output')
+        data = [(str(aid), [(str(n_aid), float(distance)) for n_aid, distance in zip(top_aid, top_distances)], int(bucket))
+                for aid, top_aid, top_distances, bucket in zip(aids, top_n_aids, top_n_distances, buckets)]
 
         # Output dataframe schema.
         schema = StructType([
-            StructField("did", StringType(), True),
-            StructField("top_n_similar_user", ArrayType(StructType([StructField('did', StringType(), False), StructField('score', FloatType(), False)]), True)),
-            StructField("did_bucket", IntegerType(), True)
+            StructField("aid", StringType(), True),
+            StructField("top_n_similar_user", ArrayType(StructType([StructField('aid', StringType(), False), StructField('score', FloatType(), False)]), True)),
+            StructField("aid_bucket", IntegerType(), True)
         ])
 
         # Create the output dataframe with the similar users for each user.
@@ -154,7 +155,8 @@ def run(spark_session, cfg):
         total_format_time += end_format-end_search
 
         # Write the output dataframe to Hive.
-        write_to_table_with_partition(df, similarity_table, partition=('did_bucket'), mode=mode)
+        print('Writing output to Hive')
+        write_to_table_with_partition(df, similarity_table, partition=('aid_bucket'), mode=mode)
         mode = 'append'
         end_write = time.time()
         total_write_time += end_write-end_format