You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bluemarlin.apache.org by ra...@apache.org on 2022/02/02 18:55:36 UTC
[incubator-bluemarlin] branch main updated: update lookalike application
This is an automated email from the ASF dual-hosted git repository.
radibnia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git
The following commit(s) were added to refs/heads/main by this push:
new eff880d update lookalike application
new 402e355 Merge pull request #41 from radibnia77/main
eff880d is described below
commit eff880de5cacea28d3cdbb596990054ec60531dd
Author: Reza <re...@yahoo.com>
AuthorDate: Wed Feb 2 10:53:25 2022 -0800
update lookalike application
---
.../application/pipeline/config.yml | 32 ++++----
.../lookalike_model/application/pipeline/run.sh | 2 +
.../application/pipeline/score_generator.py | 59 +++++++--------
...vector_table.py => score_vector_rebucketing.py} | 62 +++++++--------
.../application/pipeline/score_vector_table.py | 20 ++---
.../application/pipeline/top_n_similarity.py | 88 +++++++++++-----------
6 files changed, 130 insertions(+), 133 deletions(-)
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/config.yml b/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
index 50478a9..92e7c58 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
@@ -1,12 +1,10 @@
product_tag: 'lookalike_application'
-pipeline_tag: '08192021_1m'
+pipeline_tag: '12132021'
score_generator:
input:
- log_table : "lookalike_08172021_1000_logs"
- did_table: "lookalike_08172021_1000_trainready"
- keywords_table: "din_ad_keywords_09172020"
- significant_keywords_table: "lookalike_08172021_1000_keywords"
- din_model_tf_serving_url: "http://10.193.217.105:8506/v1/models/lookalike:predict"
+ aid_table: "lookalike_11192021_trainready"
+ keywords_table: "lookalike_11192021_keywords"
+ din_model_tf_serving_url: "http://10.193.217.126:8501/v1/models/lookalike_1119:predict"
din_model_length: 20
extend: 2000
alg: "euclidean" ##### currently just support "euclideand" and "dot"
@@ -14,16 +12,22 @@ score_generator:
score_table: "{product_tag}_{pipeline_tag}_score"
normalize: False
score_vector:
- keywords_table: "din_ad_keywords_09172020"
+ keywords_table: "lookalike_11192021_keywords"
score_table: "{product_tag}_{pipeline_tag}_score"
score_vector_table: "{product_tag}_{pipeline_tag}_score_vector"
- did_bucket_size: 100
- did_bucket_step: 10
+ aid_bucket_size: 100
+ aid_bucket_step: 10
+# Adds alpha_aid_bucket partition to allow for finer control of step size in top_n_similarity stage
+score_vector_rebucketing:
+ aid_bucket_size: 100
+ aid_bucket_step: 10
+ alpha_aid_bucket_size: 10 # The number of buckets to allocate for the alpha_aid_bucket column
+ score_vector_alpha_table: "{product_tag}_{pipeline_tag}_score_vector_alpha"
top_n_similarity:
- did_bucket_size: 100
- load_bucket_step: 20
- search_bucket_step: 50
- top_n: 10
- index_factory_string: "IVF256,Flat"
+ aid_bucket_size: 100 # Total number of alpha buckets to process similarity of
+ load_bucket_step: 20 # Number of alpha buckets to load into index at a time
+ search_bucket_step: 50 # Number of alpha buckets to process top N at a time
+ top_n: 10 # Number of nearest neighbors to store for each aid
+ index_factory_string: "IVF256,Flat" # See https://github.com/facebookresearch/faiss/wiki/The-index-factory for factory strings
similarity_table: "{product_tag}_{pipeline_tag}_similarity"
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/run.sh b/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
index 6af8ce1..10151eb 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
@@ -4,6 +4,8 @@ spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memo
spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml
+spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.yml
+
# Run top_n_similarity.py on a machine with a GPU.
#spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity.py config.yml
#spark-submit --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity.py config.yml
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py b/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
index c57b4b3..550d959 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
@@ -30,10 +30,10 @@ from util import resolve_placeholder, write_to_table_with_partition
'''
This process generates the score-table with the following format.
-DataFrame[age: int, gender: int, did: string, did_index: bigint,
+DataFrame[age: int, gender: int, aid: string, aid_index: bigint,
interval_starting_time: array<string>, interval_keywords: array<string>,
kwi: array<string>, kwi_show_counts: array<string>, kwi_click_counts: array<string>,
-did_bucket: string, kws: map<string,float>, kws_norm: map<string,float>]
+aid_bucket: string, kws: map<string,float>, kws_norm: map<string,float>]
'''
@@ -57,11 +57,11 @@ def str_to_intlist(table):
def input_data(record, keyword, length):
if len(record['show_counts']) >= length:
hist = flatten(record['show_counts'][:length])
- instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
+ instance = {'hist_i': hist, 'u': record['aid'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
else:
hist = flatten(record['show_counts'])
# [hist.extend([0]) for i in range(length - len(hist))]
- instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
+ instance = {'hist_i': hist, 'u': record['aid'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
return instance
@@ -112,12 +112,12 @@ def normalize(x):
class CTRScoreGenerator:
- def __init__(self, df_did, df_keywords, din_model_tf_serving_url, din_model_length):
- self.df_did = df_did
+ def __init__(self, df_aid, df_keywords, din_model_tf_serving_url, din_model_length):
+ self.df_aid = df_aid
self.df_keywords = df_keywords
self.din_model_tf_serving_url = din_model_tf_serving_url
self.din_model_length = din_model_length
- self.df_did_loaded = None
+ self.df_aid_loaded = None
self.keyword_index_list, self.keyword_list = self.get_keywords()
def get_keywords(self):
@@ -131,33 +131,35 @@ class CTRScoreGenerator:
def run(self):
def predict_udf(din_model_length, din_model_tf_serving_url, keyword_index_list, keyword_list):
- def __helper(did, kwi_show_counts, age, gender):
+ def __helper(aid, kwi_show_counts, age, gender):
kwi_show_counts = str_to_intlist(kwi_show_counts)
- record = {'did': did,
+ record = {'aid': aid,
'show_counts': kwi_show_counts,
'a': str(age),
'g': str(gender)}
response = predict(serving_url=din_model_tf_serving_url, record=record,
length=din_model_length, new_keyword=keyword_index_list)
+ # If the response is a string, there was an error.
+ assert not isinstance(response, str), 'Error occurred when retrieving keyword scores from URL.'
- did_kw_scores = dict()
+ aid_kw_scores = dict()
for i in range(len(response)):
keyword = keyword_list[i]
keyword_score = response[i][0]
- did_kw_scores[keyword] = keyword_score
+ aid_kw_scores[keyword] = keyword_score
- return did_kw_scores
+ return aid_kw_scores
return __helper
- self.df_did_loaded = self.df_did.withColumn('kws',
+ self.df_aid_loaded = self.df_aid.withColumn('kws',
udf(predict_udf(din_model_length=self.din_model_length,
din_model_tf_serving_url=self.din_model_tf_serving_url,
keyword_index_list=self.keyword_index_list,
keyword_list=self.keyword_list),
MapType(StringType(), FloatType()))
- (col('did_index'), col('kwi_show_counts'), col('age'), col('gender')))
+ (col('aid_index'), col('kwi_show_counts'), col('age'), col('gender_index')))
if __name__ == '__main__':
@@ -173,31 +175,28 @@ if __name__ == '__main__':
hive_context = HiveContext(sc)
# load dataframes
- did_table, keywords_table, significant_keywords_table, din_tf_serving_url, length = cfg['score_generator']['input']['did_table'], cfg['score_generator']['input'][
- 'keywords_table'], cfg['score_generator']['input'][
- 'significant_keywords_table'], cfg['score_generator']['input']['din_model_tf_serving_url'], cfg['score_generator']['input']['din_model_length']
+ aid_table = cfg['score_generator']['input']['aid_table']
+ keywords_table = cfg['score_generator']['input']['keywords_table']
+ din_tf_serving_url = cfg['score_generator']['input']['din_model_tf_serving_url']
+ length = cfg['score_generator']['input']['din_model_length']
- command = 'SELECT * FROM {}'
- df_did = hive_context.sql(command.format(did_table))
+ command = 'SELECT * FROM {}'.format(aid_table)
+ df_aid = hive_context.sql(command)
- command = 'SELECT T1.keyword,T1.spread_app_id,T1.keyword_index FROM {} AS T1 JOIN {} AS T2 ON T1.keyword=T2.keyword'
- df_keywords = hive_context.sql(command.format(keywords_table, significant_keywords_table))
- # temporary adding to filter based on active keywords
- df_keywords = df_keywords.filter((df_keywords.keyword == 'video') | (df_keywords.keyword == 'shopping') | (df_keywords.keyword == 'info') |
- (df_keywords.keyword == 'social') | (df_keywords.keyword == 'reading') | (df_keywords.keyword == 'travel') |
- (df_keywords.keyword == 'entertainment'))
+ command = 'SELECT keyword, keyword_index FROM {}'.format(keywords_table)
+ df_keywords = hive_context.sql(command)
score_table = cfg['score_generator']['output']['score_table']
- # create a CTR score generator instance and run to get the loaded did
- ctr_score_generator = CTRScoreGenerator(df_did, df_keywords, din_tf_serving_url, length)
+ # create a CTR score generator instance and run to get the loaded aid
+ ctr_score_generator = CTRScoreGenerator(df_aid, df_keywords, din_tf_serving_url, length)
ctr_score_generator.run()
- df = ctr_score_generator.df_did_loaded
+ df = ctr_score_generator.df_aid_loaded
# normalization is required
udf_normalize = udf(normalize, MapType(StringType(), FloatType()))
if cfg['score_generator']['normalize']:
df = df.withColumn('kws_norm', udf_normalize(col('kws')))
- # save the loaded did to hive table
- write_to_table_with_partition(df, score_table, partition=('did_bucket'), mode='overwrite')
+ # save the loaded aid to hive table
+ write_to_table_with_partition(df, score_table, partition=('aid_bucket'), mode='overwrite')
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
similarity index 54%
copy from Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
copy to Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
index 9c33903..143e43d 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
@@ -19,7 +19,7 @@ import argparse
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit, col, udf
-from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType
+from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType, IntegerType
# from rest_client import predict, str_to_intlist
import requests
import json
@@ -27,77 +27,67 @@ import argparse
from pyspark.sql.functions import udf
from math import sqrt
import time
-import numpy as np
-
-from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition
+import hashlib
from util import resolve_placeholder
+from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition
+
'''
To run, execute the following in application folder.
-spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml
+spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.yml
-This process generates the score_vector_table table.
+This process generates added secondary buckects ids (alpha-aid-bucket).
-The top-n-similarity table is
+'''
-|user| score-vector | did-bucket
-|:-------------| :------------: |
-|user-1-did| [similarity-score-11, similarity-score-12, similarity-score-13] | 1
-|user-2-did| [similarity-score-21, similarity-score-22, similarity-score-23] | 1
-|user-3-did| [similarity-score-31, similarity-score-32, similarity-score-33] | 2
-'''
+def assign_new_bucket_id(df, n, new_column_name):
+ def __hash_sha256(s):
+ hex_value = hashlib.sha256(s.encode('utf-8')).hexdigest()
+ return int(hex_value, 16)
+ _udf = udf(lambda x: __hash_sha256(x) % n, IntegerType())
+ df = df.withColumn(new_column_name, _udf(df.aid))
+ return df
def run(hive_context, cfg):
- keywords_table = cfg["score_vector"]["keywords_table"]
- score_table = cfg['score_vector']['score_table']
score_vector_table = cfg['score_vector']['score_vector_table']
- bucket_size = cfg['score_vector']['did_bucket_size']
- bucket_step = cfg['score_vector']['did_bucket_step']
-
- # get kw list
- keywords = hive_context.sql("SELECT DISTINCT(keyword) FROM {}".format(keywords_table)).collect()
- keywords = [_['keyword'] for _ in keywords]
- keywords = sorted(keywords)
+ bucket_size = cfg['score_vector_rebucketing']['aid_bucket_size']
+ bucket_step = cfg['score_vector_rebucketing']['aid_bucket_step']
+ alpha_bucket_size = cfg['score_vector_rebucketing']['alpha_aid_bucket_size']
+ score_vector_alpha_table = cfg['score_vector_rebucketing']['score_vector_alpha_table']
- # add score-vector iterativly
first_round = True
num_batches = (bucket_size + bucket_step - 1) / bucket_step
batch_num = 1
- for did_bucket in range(0, bucket_size, bucket_step):
- print('Processing batch {} of {} bucket number: {}'.format(batch_num, num_batches, did_bucket))
+ for aid_bucket in range(0, bucket_size, bucket_step):
+ print('Processing batch {} of {} bucket number: {}'.format(batch_num, num_batches, aid_bucket))
- command = "SELECT did, did_bucket, kws FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_table, did_bucket, min(did_bucket+bucket_step-1, bucket_size))
+ command = "SELECT aid, aid_bucket, score_vector, c1 FROM {} WHERE aid_bucket BETWEEN {} AND {}".format(score_vector_table, aid_bucket, min(aid_bucket+bucket_step-1, bucket_size))
- # |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0 |
- # [social -> 0.24231663, entertainment -> 0.20828941, reading -> 0.44120282, video -> 0.34497723, travel -> 0.3453492, shopping -> 0.5347804, info -> 0.1978679]|
df = hive_context.sql(command)
- df = df.withColumn("score_vector",
- udf(lambda kws: [kws[keyword] if keyword in kws else 0.0 for keyword in keywords], ArrayType(FloatType()))(df.kws))
-
- df = df.withColumn('c1', udf(lambda x: float(np.array(x).dot(np.array(x))), FloatType())(df.score_vector))
+ df = assign_new_bucket_id(df, alpha_bucket_size, 'alpha_aid_bucket')
mode = 'overwrite' if first_round else 'append'
- write_to_table_with_partition(df.select('did', 'score_vector', 'c1', 'did_bucket'), score_vector_table, partition=('did_bucket'), mode=mode)
-
+ write_to_table_with_partition(df.select('aid', 'score_vector', 'c1', 'aid_bucket', 'alpha_aid_bucket'),
+ score_vector_alpha_table, partition=('aid_bucket', 'alpha_aid_bucket'), mode=mode)
first_round = False
batch_num += 1
if __name__ == "__main__":
start = time.time()
- parser = argparse.ArgumentParser(description=" ")
+ parser = argparse.ArgumentParser(description='')
parser.add_argument('config_file')
args = parser.parse_args()
with open(args.config_file, 'r') as yml_file:
cfg = yaml.safe_load(yml_file)
resolve_placeholder(cfg)
sc = SparkContext.getOrCreate()
- sc.setLogLevel('INFO')
+ sc.setLogLevel('WARN')
hive_context = HiveContext(sc)
run(hive_context=hive_context, cfg=cfg)
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
index 9c33903..cfbe1b8 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
@@ -42,11 +42,11 @@ This process generates the score_vector_table table.
The top-n-similarity table is
-|user| score-vector | did-bucket
+|user| score-vector | aid-bucket
|:-------------| :------------: |
-|user-1-did| [similarity-score-11, similarity-score-12, similarity-score-13] | 1
-|user-2-did| [similarity-score-21, similarity-score-22, similarity-score-23] | 1
-|user-3-did| [similarity-score-31, similarity-score-32, similarity-score-33] | 2
+|user-1-aid| [similarity-score-11, similarity-score-12, similarity-score-13] | 1
+|user-2-aid| [similarity-score-21, similarity-score-22, similarity-score-23] | 1
+|user-3-aid| [similarity-score-31, similarity-score-32, similarity-score-33] | 2
'''
@@ -56,8 +56,8 @@ def run(hive_context, cfg):
keywords_table = cfg["score_vector"]["keywords_table"]
score_table = cfg['score_vector']['score_table']
score_vector_table = cfg['score_vector']['score_vector_table']
- bucket_size = cfg['score_vector']['did_bucket_size']
- bucket_step = cfg['score_vector']['did_bucket_step']
+ bucket_size = cfg['score_vector']['aid_bucket_size']
+ bucket_step = cfg['score_vector']['aid_bucket_step']
# get kw list
keywords = hive_context.sql("SELECT DISTINCT(keyword) FROM {}".format(keywords_table)).collect()
@@ -68,10 +68,10 @@ def run(hive_context, cfg):
first_round = True
num_batches = (bucket_size + bucket_step - 1) / bucket_step
batch_num = 1
- for did_bucket in range(0, bucket_size, bucket_step):
- print('Processing batch {} of {} bucket number: {}'.format(batch_num, num_batches, did_bucket))
+ for aid_bucket in range(0, bucket_size, bucket_step):
+ print('Processing batch {} of {} bucket number: {}'.format(batch_num, num_batches, aid_bucket))
- command = "SELECT did, did_bucket, kws FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_table, did_bucket, min(did_bucket+bucket_step-1, bucket_size))
+ command = "SELECT aid, aid_bucket, kws FROM {} WHERE aid_bucket BETWEEN {} AND {}".format(score_table, aid_bucket, min(aid_bucket+bucket_step-1, bucket_size))
# |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0 |
# [social -> 0.24231663, entertainment -> 0.20828941, reading -> 0.44120282, video -> 0.34497723, travel -> 0.3453492, shopping -> 0.5347804, info -> 0.1978679]|
@@ -82,7 +82,7 @@ def run(hive_context, cfg):
df = df.withColumn('c1', udf(lambda x: float(np.array(x).dot(np.array(x))), FloatType())(df.score_vector))
mode = 'overwrite' if first_round else 'append'
- write_to_table_with_partition(df.select('did', 'score_vector', 'c1', 'did_bucket'), score_vector_table, partition=('did_bucket'), mode=mode)
+ write_to_table_with_partition(df.select('aid', 'score_vector', 'c1', 'aid_bucket'), score_vector_table, partition=('aid_bucket'), mode=mode)
first_round = False
batch_num += 1
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity.py b/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity.py
index e031f9e..8e7289b 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity.py
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity.py
@@ -39,9 +39,9 @@ The top-n-similarity table is
|user| top-N-similarity|top-n-users
|:-------------| :------------: |
-|user-1-did| [similarity-score-11, similarity-score-12, similarity-score-13] |[user-did-1, user-did-2, user-did-3]|
-|user-2-did| [similarity-score-21, similarity-score-22, similarity-score-23] |[user-did-10, user-did-20, user-did-30]|
-|user-3-did| [similarity-score-31, similarity-score-32, similarity-score-33] |[user-did-23, user-did-87, user-did-45]|
+|user-1-aid| [similarity-score-11, similarity-score-12, similarity-score-13] |[user-aid-1, user-aid-2, user-aid-3]|
+|user-2-aid| [similarity-score-21, similarity-score-22, similarity-score-23] |[user-aid-10, user-aid-20, user-aid-30]|
+|user-3-aid| [similarity-score-31, similarity-score-32, similarity-score-33] |[user-aid-23, user-aid-87, user-aid-45]|
'''
@@ -49,27 +49,27 @@ The top-n-similarity table is
def load_score_vectors(spark_session, score_vector_table,
bucket, bucket_step, bucket_size):
# Run the query of the Hive data.
- command = "SELECT did, did_bucket, score_vector FROM {} WHERE did_bucket BETWEEN {} AND {}".format(
+ command = "SELECT aid, aid_bucket, score_vector, alpha_aid_bucket FROM {} WHERE alpha_aid_bucket BETWEEN {} AND {}".format(
score_vector_table, bucket, min(bucket+bucket_step-1, bucket_size))
df = spark_session.sql(command)
- # Get the dids, score vectors, and buckets as numpy arrays.
+ # Get the aids, score vectors, and buckets as numpy arrays.
- # we need to collect them first and then seperate them, otherwise we cannot relate dids to scors
- _all = df.select('did', 'score_vector', 'did_bucket').collect()
- dids = np.array([_['did'] for _ in _all])
- score_vectors = np.array([_['score_vector'] for _ in _all], dtype='f4')
- buckets = np.array([_['did_bucket'] for _ in _all])
+ # We need to collect them first and then seperate them, otherwise we cannot relate aids to scores.
+ _all = df.select('aid', 'score_vector', 'aid_bucket').collect()
+ aids = np.array([i['aid'] for i in _all])
+ score_vectors = np.array([i['score_vector'] for i in _all], dtype='f4')
+ buckets = np.array([i['aid_bucket'] for i in _all])
- # Return the dids and score_vectors.
- return (dids, score_vectors, buckets)
+ # Return the aids and score_vectors.
+ return (aids, score_vectors, buckets)
def run(spark_session, cfg):
- score_vector_table = cfg['score_vector']['score_vector_table']
+ score_vector_table = cfg['score_vector_rebucketing']['score_vector_alpha_table']
similarity_table = cfg['top_n_similarity']['similarity_table']
top_n_value = cfg['top_n_similarity']['top_n']
- did_bucket_size = cfg['top_n_similarity']['did_bucket_size']
+ aid_bucket_size = cfg['top_n_similarity']['aid_bucket_size']
load_bucket_step = cfg['top_n_similarity']['load_bucket_step']
search_bucket_step = cfg['top_n_similarity']['search_bucket_step']
index_factory_string = cfg['top_n_similarity']['index_factory_string']
@@ -82,20 +82,20 @@ def run(spark_session, cfg):
start_time = time.time()
# Load the score vectors into the index.
- did_list = []
- for did_bucket in range(0, did_bucket_size, load_bucket_step):
- print('Loading buckets {} - {} of {}'.format(did_bucket, did_bucket+load_bucket_step-1, did_bucket_size))
- (dids, score_vectors, _) = load_score_vectors(spark_session, score_vector_table,
- did_bucket, load_bucket_step, did_bucket_size)
-
- # Keep track of the dids.
- if did_bucket == 0:
- did_list = dids
+ aid_list = []
+ for aid_bucket in range(0, aid_bucket_size, load_bucket_step):
+ print('Loading alpha buckets {} - {} of {}'.format(aid_bucket, aid_bucket+load_bucket_step-1, aid_bucket_size))
+ (aids, score_vectors, _) = load_score_vectors(spark_session, score_vector_table,
+ aid_bucket, load_bucket_step, aid_bucket_size)
+
+ # Keep track of the aids.
+ if aid_bucket == 0:
+ aid_list = aids
else:
- did_list = np.concatenate((did_list, dids))
+ aid_list = np.concatenate((aid_list, aids))
# Create the FAISS index on the first iteration.
- if did_bucket == 0:
+ if aid_bucket == 0:
cpu_index = faiss.index_factory(score_vectors.shape[1], index_factory_string)
gpu_index = faiss.index_cpu_to_all_gpus(cpu_index)
@@ -106,7 +106,6 @@ def run(spark_session, cfg):
# Add the vectors to the index.
gpu_index.add(score_vectors)
-
load_time = time.time()
# Find the top N by bucket step.
@@ -116,36 +115,38 @@ def run(spark_session, cfg):
total_load_time = 0
total_format_time = 0
total_write_time = 0
- for did_bucket in range(0, did_bucket_size, search_bucket_step):
- # Search for the top N similar users for bucket.
- (dids, score_vectors, buckets) = load_score_vectors(spark_session, score_vector_table, did_bucket, search_bucket_step, did_bucket_size)
+ for aid_bucket in range(0, aid_bucket_size, search_bucket_step):
+ print('Searching alpha buckets {} - {} of {}'.format(aid_bucket, aid_bucket+search_bucket_step-1, aid_bucket_size))
+
+ # Load the users to perform the search with.
+ print('Loading users from Hive')
+ (aids, score_vectors, buckets) = load_score_vectors(spark_session, score_vector_table, aid_bucket, search_bucket_step, aid_bucket_size)
end_load = time.time()
total_load_time += end_load-start_load
+ # Search for the top N similar users for bucket.
+ print('Performing the search')
top_n_distances, top_n_indices = gpu_index.search(score_vectors, top_n_value)
end_search = time.time()
total_search_time += end_search-end_load
- # start_load = end_search
- # continue
-
- # Get the top N dids from the top N indexes.
- top_n_dids = did_list[top_n_indices]
+ # Get the top N aids from the top N indexes.
+ top_n_aids = aid_list[top_n_indices]
# Format and write the result back to Hive.
# Format the data for a Spark dataframe in order to write to Hive.
- # [ ('0000001', [{'did':'0000001', 'score':1.73205081}, {'did':'0000003', 'score':1.73205081}, {'did':'0000004', 'score':0.88532267}, {'did':'0000002', 'score':0.66903623}], 0),
- # ('0000002', [{'did':'0000002', 'score':1.73205081}, {'did':'0000004', 'score':1.50844401}, {'did':'0000001', 'score':0.66903623}, {'did':'0000003', 'score':0.66903623}], 0),
+ # [ ('0000001', [{'aid':'0000001', 'score':1.73205081}, {'aid':'0000003', 'score':1.73205081}, {'aid':'0000004', 'score':0.88532267}, {'aid':'0000002', 'score':0.66903623}], 0),
+ # ('0000002', [{'aid':'0000002', 'score':1.73205081}, {'aid':'0000004', 'score':1.50844401}, {'aid':'0000001', 'score':0.66903623}, {'aid':'0000003', 'score':0.66903623}], 0),
# ... ]
- data = [(str(did), [(str(n_did), float(distance)) for n_did, distance in zip(top_did, top_distances)], int(bucket))
- for did, top_did, top_distances, bucket in zip(dids, top_n_dids, top_n_distances, buckets)]
-
+ print('Formatting the output')
+ data = [(str(aid), [(str(n_aid), float(distance)) for n_aid, distance in zip(top_aid, top_distances)], int(bucket))
+ for aid, top_aid, top_distances, bucket in zip(aids, top_n_aids, top_n_distances, buckets)]
# Output dataframe schema.
schema = StructType([
- StructField("did", StringType(), True),
- StructField("top_n_similar_user", ArrayType(StructType([StructField('did', StringType(), False), StructField('score', FloatType(), False)]), True)),
- StructField("did_bucket", IntegerType(), True)
+ StructField("aid", StringType(), True),
+ StructField("top_n_similar_user", ArrayType(StructType([StructField('aid', StringType(), False), StructField('score', FloatType(), False)]), True)),
+ StructField("aid_bucket", IntegerType(), True)
])
# Create the output dataframe with the similar users for each user.
@@ -154,7 +155,8 @@ def run(spark_session, cfg):
total_format_time += end_format-end_search
# Write the output dataframe to Hive.
- write_to_table_with_partition(df, similarity_table, partition=('did_bucket'), mode=mode)
+ print('Writing output to Hive')
+ write_to_table_with_partition(df, similarity_table, partition=('aid_bucket'), mode=mode)
mode = 'append'
end_write = time.time()
total_write_time += end_write-end_format