You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bluemarlin.apache.org by ra...@apache.org on 2022/02/02 18:33:49 UTC
[incubator-bluemarlin] branch main updated: Update main_tfrecord_generator.py
This is an automated email from the ASF dual-hosted git repository.
radibnia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git
The following commit(s) were added to refs/heads/main by this push:
new 4ee29bc Update main_tfrecord_generator.py
new 1575581 Merge pull request #40 from radibnia77/main
4ee29bc is described below
commit 4ee29bcd32ed9780818b14b0bed3de7c96cd468e
Author: Reza <re...@yahoo.com>
AuthorDate: Wed Feb 2 10:32:13 2022 -0800
Update main_tfrecord_generator.py
Replace pandas with spark
---
.../pipeline/main_tfrecord_generator.py | 113 +++++++++------------
1 file changed, 48 insertions(+), 65 deletions(-)
diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_tfrecord_generator.py b/Model/lookalike-model/lookalike_model/pipeline/main_tfrecord_generator.py
index b9eb1bd..b406ca5 100644
--- a/Model/lookalike-model/lookalike_model/pipeline/main_tfrecord_generator.py
+++ b/Model/lookalike-model/lookalike_model/pipeline/main_tfrecord_generator.py
@@ -16,31 +16,17 @@
# under the License.
"""
-spark-submit --master yarn --executor-memory 16G --driver-memory 24G --num-executors 10 --executor-cores 5 --jars spark-tensorflow-connector_2.11-1.15.0.jar --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/_main_trainready_india.py config.yml
+spark-submit --master yarn --executor-memory 16G --driver-memory 24G --num-executors 10 --executor-cores 5 --jars spark-tensorflow-connector_2.11-1.15.0.jar --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/main_trainready_generator.py config.yml
input: trainready table
output: dataset readable by trainer in tfrecord format
"""
-import yaml
-import argparse
-import os
-import timeit
-from pyspark import SparkContext
-from pyspark.sql import functions as fn
-from pyspark.sql.functions import lit, col, udf, collect_list, concat_ws, first, create_map, monotonically_increasing_id
-from pyspark.sql.functions import count, lit, col, udf, expr, collect_list, explode
-from pyspark.sql.window import Window
-from pyspark.sql.types import IntegerType, ArrayType, StringType,BooleanType
-from pyspark.sql import HiveContext
-from pyspark.sql.session import SparkSession
-from datetime import datetime, timedelta
-from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition, print_batching_info, resolve_placeholder, load_config, load_batch_config, load_df
-from itertools import chain
-from pyspark.sql.types import IntegerType, ArrayType, StringType, BooleanType, FloatType, DoubleType
-from util import write_to_table, write_to_table_with_partition, save_pickle_file
-
-
-def generate_tfrecord(sc, hive_context, tf_statis_path, keyword_table, cutting_date, length, trainready_table, tfrecords_hdfs_path_train, tfrecords_hdfs_path_test):
+from pyspark.sql.functions import lit, udf, explode
+from pyspark.sql.types import IntegerType, ArrayType, StructType, StructField
+from util import save_pickle_file, resolve_placeholder, load_config
+
+
+def generate_tfrecord(hive_context, tf_stat_path, keyword_table, cutting_date, length, trainready_table, tfrecords_hdfs_path_train, tfrecords_hdfs_path_test):
def str_to_intlist(table):
ji = []
@@ -59,40 +45,13 @@ def generate_tfrecord(sc, hive_context, tf_statis_path, keyword_table, cutting_d
ji.append(s)
return ji
- def flatten(lst):
- f = [y for x in lst for y in x]
- return f
-
def padding(kwlist,length):
diff = length-len(kwlist)
- print(len(kwlist))
- print(length)
- print(diff)
temp_list = [0 for i in range(diff)]
padded_keyword = kwlist + temp_list
return padded_keyword
- def create_dataset(df_panda ,click, keyword):
- t_set = []
- for i in range(len(df_panda.aid_index)):
- click_counts = click[i]
- keyword_int = keyword[i]
- aid_index = df_panda.aid_index[i]
- for m in range(len(click_counts)):
- for n in range(len(click_counts[m])):
- if (click_counts[m][n] != 0):
- pos = (aid_index, flatten(keyword_int[m + 1:m + 1 + length]), keyword_int[m][n], 1)
- if len(pos[1]) >= 1:
- t_set.append(pos)
- elif (m % 5 == 0 and n % 2 == 0):
- neg = (aid_index, flatten(keyword_int[m + 1:m + 1 + length]), keyword_int[m][n], 0)
- if len(neg[1]) >= 1:
- t_set.append(neg)
- return t_set
-
- def generating_dataframe(dataset, spark ):
- data_set = [(int(tup[0]), tup[1], int(tup[2]), int(tup[3])) for tup in dataset]
- df = spark.createDataFrame(data=data_set, schema=deptColumns)
+ def generating_dataframe(df ):
df = df.withColumn("sl", udf(lambda x: len(x), IntegerType())(df.keyword_list))
df = df.where(df.sl > 5)
df = df.withColumn('max_length', lit(df.agg({'sl': 'max'}).collect()[0][0]))
@@ -100,14 +59,37 @@ def generate_tfrecord(sc, hive_context, tf_statis_path, keyword_table, cutting_d
udf(padding, ArrayType(IntegerType()))(df.keyword_list, df.max_length))
return df
- def generate_tf_statistics(testsetDF, trainDF, keyword_df, tf_statis_path):
+ def generate_tf_statistics(testsetDF, trainDF, keyword_df, tf_stat_path):
tfrecords_statistics = {}
tfrecords_statistics['test_dataset_count'] = testsetDF.count()
tfrecords_statistics['train_dataset_count'] = trainDF.count()
tfrecords_statistics['user_count'] = trainDF.select('aid').distinct().count()
tfrecords_statistics['item_count'] = keyword_df.distinct().count() + 1
- save_pickle_file(tfrecords_statistics, tf_statis_path)
+ save_pickle_file(tfrecords_statistics, tf_stat_path)
+
+ def create_trainset(aid_index, click_counts, keyword_int):
+ def flatten(lst):
+ f = [y for x in lst for y in x]
+ return f
+ t_set = []
+ for m in range(len(click_counts)):
+ for n in range(len(click_counts[m])):
+ if (click_counts[m][n] != 0):
+ pos = (aid_index, flatten(keyword_int[m + 1:m + 1 + length]), keyword_int[m][n], 1)
+ if len(pos[1]) >= 1:
+ t_set.append(pos)
+ elif (m % 5 == 0 and n % 2 == 0):
+ neg = (aid_index, flatten(keyword_int[m + 1:m + 1 + length]), keyword_int[m][n], 0)
+ if len(neg[1]) >= 1:
+ t_set.append(neg)
+ return t_set
+ schema = StructType([
+ StructField("aid_index", IntegerType(), True),
+ StructField("keyword_list", ArrayType(IntegerType()), True),
+ StructField("keyword", IntegerType(), True),
+ StructField("label", IntegerType(), True)
+ ])
command = """SELECT * FROM {}"""
df = hive_context.sql(command.format(trainready_table))
@@ -122,27 +104,28 @@ def generate_tfrecord(sc, hive_context, tf_statis_path, keyword_table, cutting_d
df = df.withColumn('keyword_int_test', udf(lambda x, y: x[:y],ArrayType(ArrayType(IntegerType())))(df._kwi, df.indicing))
df = df.withColumn('click_counts_train', udf(lambda x, y: x[y:],ArrayType(ArrayType(IntegerType())))(df.click_counts, df.indicing))
df = df.withColumn('click_counts_test', udf(lambda x, y: x[:y],ArrayType(ArrayType(IntegerType())))(df.click_counts, df.indicing))
+ df = df.withColumn('train_set', udf(create_trainset, ArrayType(schema))(df.aid_index, df.click_counts_train,df.keyword_int_train))
+ df = df.withColumn('test_set', udf(create_trainset, ArrayType(schema))(df.aid_index, df.click_counts_test, df.keyword_int_test))
+ trainDF = df.select(df.aid_index, explode(df.train_set).alias('dataset'))
+ testDF = df.select(df.aid_index, explode(df.test_set).alias('dataset'))
- spark = SparkSession(sc)
- deptColumns = ["aid", "keyword_list", "keyword", "label"]
+ train_set = trainDF.select('aid_index', trainDF.dataset['aid_index'].alias('aid'), trainDF.dataset['keyword_list'].alias('keyword_list'), trainDF.dataset['keyword'].alias('keyword'), trainDF.dataset['label'].alias('label'))
+ test_set = testDF.select('aid_index', testDF.dataset['aid_index'].alias('aid'), testDF.dataset['keyword_list'].alias('keyword_list'), testDF.dataset['keyword'].alias('keyword'), testDF.dataset['label'].alias('label'))
- df_panda = df.select('click_counts_train', 'keyword_int_train', 'aid_index').toPandas()
- train_set = create_dataset(df_panda,df_panda.click_counts_train, df_panda.keyword_int_train)
- trainDF = generating_dataframe(train_set, spark = spark)
- trainDF.write.format("tfrecords").option("recordType", "Example").mode("overwrite").save(tfrecords_hdfs_path_train)
+ train_set = generating_dataframe(train_set)
+ train_set.write.option("header", "true").option("encoding", "UTF-8").mode("overwrite").format('hive').saveAsTable(tfrecords_hdfs_path_train)
+ train_set.write.format("tfrecords").option("recordType", "Example").mode("overwrite").save(tfrecords_hdfs_path_train)
- df_panda = df.select('click_counts_test', 'keyword_int_test', 'aid_index').toPandas()
- test_set = create_dataset(df_panda, df_panda.click_counts_test, df_panda.keyword_int_test)
- testsetDF = generating_dataframe(test_set, spark = spark)
+ testsetDF = generating_dataframe(test_set)
testsetDF.write.format("tfrecords").option("recordType", "Example").mode("overwrite").save(tfrecords_hdfs_path_test)
command = "SELECT * from {}"
keyword_df = hive_context.sql(command.format(keyword_table))
- generate_tf_statistics(testsetDF, trainDF, keyword_df, tf_statis_path)
+ generate_tf_statistics(testsetDF, trainDF, keyword_df, tf_stat_path)
-def run(sc, hive_context, cfg):
+def run(hive_context, cfg):
cfgp = cfg['pipeline']
cfg_train = cfg['pipeline']['main_trainready']
trainready_table = cfg_train['trainready_output_table']
@@ -151,11 +134,11 @@ def run(sc, hive_context, cfg):
tfrecords_hdfs_path_test = cfg_tfrecord['tfrecords_hdfs_path_test']
cutting_date = cfg['pipeline']['cutting_date']
length = cfg['pipeline']['length']
- tf_statis_path = cfgp['tfrecords']['tfrecords_statistics_path']
+ tf_stat_path = cfgp['tfrecords']['tfrecords_statistics_path']
keyword_table = cfgp['main_keywords']['keyword_output_table']
- generate_tfrecord(sc, hive_context, tf_statis_path, keyword_table, cutting_date, length, trainready_table, tfrecords_hdfs_path_train, tfrecords_hdfs_path_test)
+ generate_tfrecord(hive_context, tf_stat_path, keyword_table, cutting_date, length, trainready_table, tfrecords_hdfs_path_train, tfrecords_hdfs_path_test)
if __name__ == "__main__":
@@ -166,5 +149,5 @@ if __name__ == "__main__":
"""
sc, hive_context, cfg = load_config(description="pre-processing train ready data")
resolve_placeholder(cfg)
- run(sc=sc, hive_context=hive_context, cfg=cfg)
+ run(hive_context=hive_context, cfg=cfg)
sc.stop()