You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bluemarlin.apache.org by xu...@apache.org on 2021/03/09 20:49:05 UTC

[incubator-bluemarlin] branch main updated: Update tests and fix UTC time

This is an automated email from the ASF dual-hosted git repository.

xunh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git


The following commit(s) were added to refs/heads/main by this push:
     new 2057d32  Update tests and fix UTC time
     new 42b6986  Merge pull request #1 from radibnia77/main
2057d32 is described below

commit 2057d32c3f19256577233de1807fc5be14a61e1a
Author: Reza <re...@yahoo.com>
AuthorDate: Tue Mar 9 12:05:42 2021 -0800

    Update tests and fix UTC time
    
    1. Add more tests
    2. Fix bug in calculating record interval_time_in_seconds
---
 Model/lookalike-model/doc/SDD.md                   | 144 ++++++++++++++++++++-
 Model/lookalike-model/lookalike_model/config.yml   | 132 +++++++------------
 .../lookalike_model/pipeline/main_clean.py         |  17 +--
 .../lookalike_model/pipeline/main_logs.py          |  18 ++-
 .../lookalike_model/pipeline/main_tfrecords.py     |   1 -
 .../lookalike_model/pipeline/main_trainready.py    |  24 ++--
 .../lookalike_model/trainer/save_model.py          | 129 ++++++++++++++++++
 Model/lookalike-model/tests/config.yml             | 130 -------------------
 .../tests/pipeline/config_clean.yml                |  39 ++++++
 .../lookalike-model/tests/pipeline/config_logs.yml |  26 ++++
 .../tests/pipeline/data_generator.py               |  93 +++++++++++--
 .../tests/pipeline/test_main_clean.py              |  45 +++----
 .../tests/pipeline/test_main_logs.py               | 114 ++++++++++++++++
 Model/lookalike-model/tests/run_test.sh            |   6 +
 14 files changed, 640 insertions(+), 278 deletions(-)

diff --git a/Model/lookalike-model/doc/SDD.md b/Model/lookalike-model/doc/SDD.md
index 437df92..f7f50a6 100644
--- a/Model/lookalike-model/doc/SDD.md
+++ b/Model/lookalike-model/doc/SDD.md
@@ -363,14 +363,16 @@ The Lookalike Service builds on the work done on the DIN model.  From the DIN mo
 ##### System Entity Diagram
 
 ```mermaid
-graph LR
+graph TD
  	user("DMP Console")--> A(["API Handler"])
     style user fill:#f9f,stroke:#333,stroke-width:4px
     
     A-->I[["Extended-Audience-Reader"]]
     I---J[("HDFS Extended Audience")]
     
-    A-->B[["Audience-Extender"]]
+    A-->|Asynchronism|A1[["Audience-Extender"]]
+    A1-->A2[["Queue"]]
+    A2-->B[["Audience-Extender-Worker"]]
     
     B-->|step 1|H["Seed-Data-Reader"]
     B-->|step 2|C["Seed-Clustering"]
@@ -529,4 +531,140 @@ Configuration of the Lookalike Service will be stored in Zookeeper.  The configu
 - Hive table name for score table
 - Number of clusters to group existing audience
 - Number of highest similarities to include in average
-- Percentage of user extension, 
\ No newline at end of file
+- Percentage of user extension
+
+
+
+# Spark Environment Tuning
+
+Low performance on Spark operations can caused by these factors:
+
+1. Level of Parallelism
+2. Data Locality
+
+
+
+### Level of Parallelism
+
+Spark is about parallel computations. Too low parallelism means the job will be running longer too high parallelism would require a lots of resource. So defining the degree of parallelism depends on the number of cores available in the cluster. **Best way to decide a number of spark partitions in an RDD is to make the number of partitions equal to the number of cores over the cluster.**
+
+There are 2 properties which can be used to increase the level of parallelism -
+
+```
+spark.default.parallelism
+spark.sql.shuffle.partitions
+```
+
+```spark.sql.shuffle.partitions``` is used when you are dealing with spark SQL or dataframe API.
+
+
+
+A right level of Parallelism means that a partition can be fit into a memory of one node. To achieve right level of Parallelism follow these steps:
+
+> a. Identify right about of memory for each executer.
+> b. Partition data so that each partition can be fit into memory of a node.
+> c. Use right number of executers.
+> d. Respect partitions in queries
+
+
+
+### Data Locality
+
+Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together, then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.
+
+Calling `groupBy()`, `groupByKey()`, `reduceByKey()`, `join()` and similar functions on dataframe results in shuffling data between multiple executors and even machines and finally repartitions data into 200 partitions by default. Pyspark default defines shuffling partition to 200 using `spark.sql.shuffle.partitions` configuration.
+
+
+
+### Experiment 
+
+The project was run on the spark cluster version 2.3 with Java 8.
+
+
+
+#### Spark Environment Settings
+
+The Hadoop cluster has the '600GB' Memory and '200' V-Cores.
+The following command was used for each step of the pipeline.
+
+```shell
+spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict <python-file> config.yml
+```
+
+This command engages 50% of the cluster (110 V-Cores) to carry out the operation.
+
+
+
+#### Elapsed Time
+
+The following is the elapsed time for each step of the pipeline.
+
+
+
+|STEP|INPUT TABLE NAME|TABLE SIZE RECORDS|PARTITIONS|ELAPSED|
+|:-------------| :------------: |:------------: |-------------- |
+|main_clean.py  | ads_cleanlog_0520 |1,036,183|NONE|51mins, 18sec|
+|				| ads_showlog_0520 |44,946,000||
+|				| ads_persona_0520 |380,000||
+|main_logs.py|lookalike_02242021_clicklog|251,271|DAY,DID|4mins, 41sec|
+||lookalike_02242021_showlog|12,165,993||
+|main_trainready.py|lookalike_02242021_logs|12,417,264|DAY,DID|15mins, 0sec|
+
+
+
+#### Debugging for Performance Bottlenecks
+
+One way to find a bottleneck is to measure the elapsed time for an operation.
+
+Use the following code after a specific operation to measure the elapsed time.
+
+```python
+import timeit
+def get_elapsed_time(df):
+	start = timeit.default_timer()
+    df.take(1)
+    end = timeit.default_timer()
+    return end-start
+```
+
+For example in the following pyspark code, the `get_elapsed_time(df)` is called in 2 different places.  Note, that the time measurement is from the beginning of the code up to the place where`get_elapsed_time(df)` is called.
+
+```spark
+ trainready_table_temp
+    batched_round = 1
+    for did_bucket in range(did_bucket_num):
+        command = """SELECT * 
+                        FROM {} 
+                        WHERE 
+                        did_bucket= '{}' """
+        df = hive_context.sql(command.format(trainready_table_temp, did_bucket))
+        df = collect_trainready(df)
+        print(get_elapsed_time(df))
+        
+        df = build_feature_array(df)
+        print(get_elapsed_time(df))
+        
+        for i, feature_name in enumerate(['interval_starting_time', 'interval_keywords', 'kwi', 'kwi_show_counts', 'kwi_click_counts']):
+            df = df.withColumn(feature_name, col('metrics_list').getItem(i))
+
+        # Add did_index
+        df = df.withColumn('did_index', monotonically_increasing_id())
+        df = df.select('age', 'gender', 'did', 'did_index', 'interval_starting_time', 'interval_keywords',
+                       'kwi', 'kwi_show_counts', 'kwi_click_counts', 'did_bucket')
+
+        mode = 'overwrite' if batched_round == 1 else 'append'
+        write_to_table_with_partition(df, trainready_table, partition=('did_bucket'), mode=mode)
+        batched_round += 1
+
+    return
+```
+
+
+
+
+
+
+
+
+
+
diff --git a/Model/lookalike-model/lookalike_model/config.yml b/Model/lookalike-model/lookalike_model/config.yml
index afb7ad5..e6b1536 100644
--- a/Model/lookalike-model/lookalike_model/config.yml
+++ b/Model/lookalike-model/lookalike_model/config.yml
@@ -1,5 +1,5 @@
 product_tag: 'lookalike'
-pipeline_tag: '02182021'
+pipeline_tag: '03042021'
 persona_table_name: 'ads_persona_0520'
 showlog_table_name: 'ads_showlog_0520'
 clicklog_table_name: 'ads_clicklog_0520'
@@ -108,91 +108,53 @@ pipeline:
     clicklog_output_table: '{product_tag}_{pipeline_tag}_clicklog'
     conditions: {
       'new_slot_id_list': [  
-        '06',
-        '11',
-        '05',
-        '04',
-        '03',
-        '02',
-        '01',
-        'l03493p0r3',
-        'x0ej5xhk60kjwq',
-        'g7m2zuits8',
-        'w3wx3nv9ow5i97',
-        'a1nvkhk62q',
-        'g9iv6p4sjy',
-        'c4n08ku47t',
-        'b6le0s4qo8',
-        'd9jucwkpr3',
-        'p7gsrebd4m',
-        'a8syykhszz',
-        'l2d4ec6csv',
-        'j1430itab9wj3b',
-        's4z85pd1h8',
-        'z041bf6g4s',
-        '71bcd2720e5011e79bc8fa163e05184e',
-        'a47eavw7ex',
-        '68bcd2720e5011e79bc8fa163e05184e',
-        '66bcd2720e5011e79bc8fa163e05184e',
-        '72bcd2720e5011e79bc8fa163e05184e',
-        'f1iprgyl13',
-        'q4jtehrqn2',
-        'm1040xexan',
-        'd971z9825e',
-        'a290af82884e11e5bdec00163e291137',
-        'w9fmyd5r0i',
-        'x2fpfbm8rt',
-        'e351de37263311e6af7500163e291137',
-        'k4werqx13k',
-        '5cd1c663263511e6af7500163e291137',
-        '17dd6d8098bf11e5bdec00163e291137',
-        'd4d7362e879511e5bdec00163e291137',
-        '15e9ddce941b11e5bdec00163e291137'
+            '06',
+            '11',
+            '05',
+            '04',
+            '03',
+            '02',
+            '01',
+            'l03493p0r3',
+            'x0ej5xhk60kjwq',
+            'g7m2zuits8',
+            'w3wx3nv9ow5i97',
+            'a1nvkhk62q',
+            'g9iv6p4sjy',
+            'd47737w664',
+            'c4n08ku47t',
+            'b6le0s4qo8',
+            'd9jucwkpr3',
+            'p7gsrebd4m',
+            'a8syykhszz',
+            'l2d4ec6csv',
+            'j1430itab9wj3b',
+            'h034y5sp0i',
+            's4z85pd1h8',
+            'z041bf6g4s',
+            '71bcd2720e5011e79bc8fa163e05184e',
+            'a47eavw7ex',
+            '68bcd2720e5011e79bc8fa163e05184e',
+            '66bcd2720e5011e79bc8fa163e05184e',
+            '72bcd2720e5011e79bc8fa163e05184e',
+            'f1iprgyl13',
+            'q4jtehrqn2',
+            'm1040xexan',
+            'd971z9825e',
+            'a83jryvehg',
+            '7b0d7b55ab0c11e68b7900163e3e481d',
+            'a290af82884e11e5bdec00163e291137',
+            'w9fmyd5r0i',
+            'x2fpfbm8rt',
+            'e351de37263311e6af7500163e291137',
+            'k4werqx13k',
+            '5cd1c663263511e6af7500163e291137',
+            '17dd6d8098bf11e5bdec00163e291137',
+            'd4d7362e879511e5bdec00163e291137',
+            '15e9ddce941b11e5bdec00163e291137'
       ],
-      'new_slot_id_app_name_list': [
-        'Huawei Magazine',
-        'Huawei Magazine',
-        'Huawei Magazine',
-        'Huawei Magazine',
-        'Huawei Magazine',
-        'Huawei Magazine',
-        'Huawei Magazine',
-        'Huawei Browser',
-        'Huawei Video',
-        'Huawei Video',
-        'Huawei Video',
-        'Huawei Music',
-        'Huawei Music',
-        'Huawei Music',
-        'Huawei Music',
-        'Huawei Reading',
-        'Huawei Reading',
-        'Huawei Reading',
-        'Huawei Reading',
-        'Video 1.0',
-        'Video 2.0',
-        'Tencent Video',
-        'AI assistant',
-        'AI assistant',
-        'AI assistant',
-        'AI assistant',
-        'Huawei Video',
-        'Huawei Video',
-        'Huawei Video',
-        'Video 1.0',
-        'Themes',
-        'Huawei Music',
-        'Huawei Reading',
-        'Huawei Reading',
-        'Huawei Reading',
-        'Huawei Reading',
-        'Honor Reading',
-        'Video 1.0',
-        'Video 2.0',
-        'HiSkytone'
-      ],
-      'starting_date': '2019-12-19', #2019-12-19
-      'ending_date': '2020-04-15' #2020-04-15 #2019-12-23
+      'starting_date': '2019-12-18', #2019-12-18
+      'ending_date': '2020-04-16' #2020-04-15 #2019-12-23
     }
   main_logs:
     interval_time_in_seconds: 86400 # default=1 day, group logs in interval time.
diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_clean.py b/Model/lookalike-model/lookalike_model/pipeline/main_clean.py
index 293d6f7..9efa3c6 100644
--- a/Model/lookalike-model/lookalike_model/pipeline/main_clean.py
+++ b/Model/lookalike-model/lookalike_model/pipeline/main_clean.py
@@ -69,25 +69,21 @@ def clean_batched_log(df, df_persona, conditions, df_keywords, did_bucket_num):
     df_keywords: keywords-spread-app-id dataframe
 
     This methods:
-    1. Filters right slot-ids and add media-category.
+    1. Filters right slot-ids.
     2. Add gender and age from persona table to each record of log
     3. Add keyword to each row by looking to spread-app-id
     """
-    def filter_new_si(df, new_slot_id_list, new_slot_id_app_name_list):
+    def filter_new_si(df, new_slot_id_list):
         """
         This filters logs with pre-defined slot-ids.
         """
         new_si_set = set(new_slot_id_list)
         _udf = udf(lambda x: x in new_si_set, BooleanType())
         df = df.filter(_udf(df.slot_id))
-        slot_map = dict(zip(new_slot_id_list, slot_app_map))
-        _udf_map = udf(lambda x: slot_map[x] if x in slot_map else '', StringType())
-        df = df.withColumn('media_category', _udf_map(df.slot_id))
         return df
 
     new_slot_id_list = conditions['new_slot_id_list']
-    slot_app_map = conditions['new_slot_id_app_name_list']
-    df = filter_new_si(df, new_slot_id_list, slot_app_map)
+    df = filter_new_si(df, new_slot_id_list)
     df = df.join(df_persona, on=['did'], how='inner')
     df = df.join(df_keywords, on=['spread_app_id'], how="inner")
     df = add_day(df)
@@ -109,7 +105,7 @@ def clean_logs(cfg, df_persona, df_keywords, log_table_names):
     starting_time = datetime.strptime(start_date, "%Y-%m-%d")
     ending_time = datetime.strptime(end_date, "%Y-%m-%d")
     columns = ['spread_app_id', 'did', 'adv_id', 'media', 'slot_id', 'device_name', 'net_type', 'price_model',
-               'action_time', 'media_category', 'gender', 'age', 'keyword', 'keyword_index', 'day', 'did_bucket']
+               'action_time', 'gender', 'age', 'keyword', 'keyword_index', 'day', 'did_bucket']
 
     batched_round = 1
     while starting_time < ending_time:
@@ -150,12 +146,11 @@ def clean_logs(cfg, df_persona, df_keywords, log_table_names):
         # write_to_table(df_showlog_batched, "ads_showlog_0520_2days", mode='overwrite')
         # write_to_table(df_clicklog_batched, "ads_clicklog_0520_2days", mode='overwrite')
         # return
-
+ 
+        # Node: for mode='append' spark might throw socket closed exception, it was due to bug in spark and does not affect data and table.
         mode = 'overwrite' if batched_round == 1 else 'append'
 
         df_showlog_batched = clean_batched_log(df_showlog_batched, df_persona, conditions, df_keywords, did_bucket_num=did_bucket_num)
-
-        # Node: for mode='append' spark might throw socket closed exception, it was due to bug in spark and does not affect data and table.
         df_showlog_batched = df_showlog_batched.select(columns)
         write_to_table_with_partition(df_showlog_batched, showlog_output_table, partition=('day', 'did_bucket'), mode=mode)
 
diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_logs.py b/Model/lookalike-model/lookalike_model/pipeline/main_logs.py
index 200d81b..b2eeb53 100644
--- a/Model/lookalike-model/lookalike_model/pipeline/main_logs.py
+++ b/Model/lookalike-model/lookalike_model/pipeline/main_logs.py
@@ -33,8 +33,8 @@ def join_logs(hive_context, batch_config, interval_time_in_seconds, log_table_na
     def union_logs(df_clicklog, df_showlog):
         # union click log and show log.
         columns = ['did', 'is_click', 'action_time', 'keyword',
-                   'keyword_index', 'media', 'media_category',
-                   'net_type', 'gender', 'age', 'adv_id', 'day', 'did_bucket']
+                   'keyword_index', 'media', 'net_type', 'gender',
+                   'age', 'adv_id', 'day', 'did_bucket']
 
         df_clicklog = df_clicklog.withColumn('is_click', lit(1))
         df_clicklog = df_clicklog.select(columns)
@@ -46,7 +46,13 @@ def join_logs(hive_context, batch_config, interval_time_in_seconds, log_table_na
         return df_unionlog
 
     def transform_action_time(df_logs, interval_time_in_seconds):
-        _udf_time = udf(lambda x: int(datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f').strftime("%s")), IntegerType())
+        
+        def to_timestamp(x):
+            dt = datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f')
+            epoch = datetime.utcfromtimestamp(0)
+            return int((dt - epoch).total_seconds())
+
+        _udf_time = udf(to_timestamp, IntegerType())
         df_logs = df_logs.withColumn('action_time_seconds', _udf_time(col('action_time')))
 
         _udf_interval_time = udf(lambda x: x - x % interval_time_in_seconds, IntegerType())
@@ -78,7 +84,6 @@ def join_logs(hive_context, batch_config, interval_time_in_seconds, log_table_na
                         keyword, 
                         keyword_index,                    
                         media, 
-                        media_category, 
                         net_type, 
                         gender, 
                         age, 
@@ -99,9 +104,8 @@ def join_logs(hive_context, batch_config, interval_time_in_seconds, log_table_na
             df_logs_batched = transform_action_time(df_logs_batched, interval_time_in_seconds)
 
             columns = ['did', 'is_click', 'action_time', 'keyword',
-                       'keyword_index', 'media', 'media_category',
-                       'net_type', 'gender', 'age', 'adv_id',
-                       'interval_starting_time', 'action_time_seconds',
+                       'keyword_index', 'media', 'net_type', 'gender',
+                       'age', 'adv_id', 'interval_starting_time', 'action_time_seconds',
                        'day', 'did_bucket']
             df_logs_batched = df_logs_batched.select(columns)
 
diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_tfrecords.py b/Model/lookalike-model/lookalike_model/pipeline/main_tfrecords.py
index d211d37..575a924 100644
--- a/Model/lookalike-model/lookalike_model/pipeline/main_tfrecords.py
+++ b/Model/lookalike-model/lookalike_model/pipeline/main_tfrecords.py
@@ -14,7 +14,6 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-
 import yaml
 import argparse
 
diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py b/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py
index c7b1f4b..0fc3247 100644
--- a/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py
+++ b/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py
@@ -21,14 +21,21 @@ import timeit
 
 from pyspark import SparkContext
 from pyspark.sql import functions as fn
-from pyspark.sql.functions import lit, col, udf, collect_list, concat_ws, first, create_map, monotonically_increasing_id
+from pyspark.sql.functions import lit, col, udf, collect_list, concat_ws, first, create_map, monotonically_increasing_id, row_number
 from pyspark.sql.window import Window
-from pyspark.sql.types import IntegerType, ArrayType, StringType
+from pyspark.sql.types import IntegerType, ArrayType, StringType, LongType
 from pyspark.sql import HiveContext
 from datetime import datetime, timedelta
 from util import write_to_table, write_to_table_with_partition, print_batching_info, resolve_placeholder, load_config, load_batch_config, load_df
 from itertools import chain
 
+MAX_USER_IN_BUCKET = 10**9
+
+
+def date_to_timestamp(dt):
+    epoch = datetime.utcfromtimestamp(0)
+    return int((dt - epoch).total_seconds())
+
 
 def generate_trainready(hive_context, batch_config,
                         interval_time_in_seconds,
@@ -43,7 +50,7 @@ def generate_trainready(hive_context, batch_config,
             first('gender').alias('gender'),
             first('did_bucket').alias('did_bucket'),
             fn.sum(col('is_click')).alias('kw_clicks_count'),
-            fn.count(fn.when(col('is_click') == 0, 1).otherwise(0)).alias('kw_shows_count'),
+            fn.sum(fn.when(col('is_click') == 0, 1).otherwise(0)).alias('kw_shows_count'),
         )
 
         df = df.withColumn('kwi_clicks_count', concat_ws(":", col('keyword_index'), col('kw_clicks_count')))
@@ -95,7 +102,7 @@ def generate_trainready(hive_context, batch_config,
             tmp_list = []
             for _dict in attr_map_list:
                 tmp_list.append((_dict['interval_starting_time'], _dict))
-            tmp_list.sort(reverse=True)
+            tmp_list.sort(reverse=True, key=lambda x: x[0])
 
             interval_starting_time = []
             interval_keywords = []
@@ -109,7 +116,6 @@ def generate_trainready(hive_context, batch_config,
                 kwi_show_counts.append(_dict['kwi_show_counts'])
                 kwi_click_counts.append(_dict['kwi_click_counts'])
             return [interval_starting_time, interval_keywords, kwi, kwi_show_counts, kwi_click_counts]
-
         df = df.withColumn('metrics_list', udf(udf_function, ArrayType(ArrayType(StringType())))(col('attr_map_list')))
         return df
 
@@ -130,8 +136,8 @@ def generate_trainready(hive_context, batch_config,
     ending_time = datetime.strptime(end_date, "%Y-%m-%d")
 
     all_intervals = set()
-    st = int(starting_time.strftime("%s"))
-    et = int(ending_time.strftime("%s"))
+    st = date_to_timestamp(starting_time)
+    et = date_to_timestamp(ending_time)
     x = st
     while x < et:
         interval_point = x - x % interval_time_in_seconds
@@ -191,7 +197,9 @@ def generate_trainready(hive_context, batch_config,
             df = df.withColumn(feature_name, col('metrics_list').getItem(i))
 
         # Add did_index
-        df = df.withColumn('did_index', monotonically_increasing_id())
+        w = Window.orderBy("did_bucket", "did")
+        df = df.withColumn('row_number', row_number().over(w))
+        df = df.withColumn('did_index', udf(lambda x: did_bucket*(MAX_USER_IN_BUCKET) + x, LongType())(col('row_number')))
         df = df.select('age', 'gender', 'did', 'did_index', 'interval_starting_time', 'interval_keywords',
                        'kwi', 'kwi_show_counts', 'kwi_click_counts', 'did_bucket')
 
diff --git a/Model/lookalike-model/lookalike_model/trainer/save_model.py b/Model/lookalike-model/lookalike_model/trainer/save_model.py
new file mode 100644
index 0000000..c488470
--- /dev/null
+++ b/Model/lookalike-model/lookalike_model/trainer/save_model.py
@@ -0,0 +1,129 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+ 
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+#! /usr/bin/env python
+#  generate tensorflow serving servable model from trained checkpoint model.
+#
+#  The program uses information from input data and trained checkpoint model
+#  to generate TensorFlow SavedModel that can be loaded by standard
+#  tensorflow_model_server.
+#
+#  sample command:
+#      python save_model.py --predict_ads_num=30 --data_dir=./ --ckpt_dir=./save_path --saved_dir=./save_path --model_version=1
+#
+#      the resultant saved_model will locate at ./save_path/1
+
+from __future__ import print_function
+
+import os
+import sys
+
+# This is a placeholder for a Google-internal import.
+
+import tensorflow as tf
+import shutil
+from model import Model
+from collections import Iterable
+import pickle
+
+tf.app.flags.DEFINE_integer('model_version', 1, 'version number of the model.')
+tf.app.flags.DEFINE_integer('predict_batch_size', 32, 'batch size of prediction.')
+tf.app.flags.DEFINE_integer('predict_ads_num', 100, 'number of ads in prediction.')
+tf.app.flags.DEFINE_string('saved_dir', './save_path', 'directory to save generated tfserving model.')
+tf.app.flags.DEFINE_string('ckpt_dir', default='./save_path', help='checkpint directory')
+tf.app.flags.DEFINE_string('data_dir', default='./', help='data file directory which contains cate_list')
+tf.app.flags.DEFINE_string('cate_list_fl', 'data/vars', 'input data directory')
+FLAGS = tf.app.flags.FLAGS
+
+def main(_):
+  if len(sys.argv) < 3:
+      print('Usage: saved_model.py [--model_version=y] --data_dir=xxx --ckpt_dir=xxx --saved_dir=xxx')
+      sys.exit(-1)
+  if FLAGS.model_version <= 0:
+    print('Please specify a positive value for version number.')
+    sys.exit(-1)
+
+  fn_data = os.path.join(FLAGS.data_dir, 'ad_dataset_lookalike.pkl')
+  with open(fn_data, 'rb') as f:
+    _ = pickle.load(f)
+    _ = pickle.load(f)
+    cate_list = pickle.load(f)
+    user_count, item_count, cate_count = pickle.load(f)
+
+  model = Model(user_count, item_count, cate_count, cate_list, FLAGS.predict_batch_size, FLAGS.predict_ads_num)
+
+  # load checkpoint model from training
+  print('loading checkpoint model...')
+  ckpt_file = tf.train.latest_checkpoint(FLAGS.ckpt_dir)
+
+  saver = tf.train.Saver(name='deploy_saver', var_list=None)
+  with tf.Session(config=tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth=True))) as sess:
+    # pipe.load_vars(sess)
+    # pipe.init_iterator(sess)
+    # model = Model(user_count, item_count, cate_count, cate_list, predict_batch_size, predict_ads_num)
+    sess.run(tf.global_variables_initializer())
+    sess.run(tf.local_variables_initializer())
+
+    saver.restore(sess, ckpt_file)
+    print('Done loading checkpoint model')
+    export_path_base = FLAGS.saved_dir
+    export_path = os.path.join(tf.compat.as_bytes(export_path_base), tf.compat.as_bytes(str(FLAGS.model_version)))
+    print('Exporting trained model to', export_path)
+    if os.path.isdir(export_path):
+      shutil.rmtree(export_path)
+    builder = tf.saved_model.builder.SavedModelBuilder(export_path)
+
+    u = tf.saved_model.utils.build_tensor_info(model.u)
+    i = tf.saved_model.utils.build_tensor_info(model.i)
+    j = tf.saved_model.utils.build_tensor_info(model.j)
+    y = tf.saved_model.utils.build_tensor_info(model.y)
+    hist_i = tf.saved_model.utils.build_tensor_info(model.hist_i)
+    sl = tf.saved_model.utils.build_tensor_info(model.sl)
+    lr = tf.saved_model.utils.build_tensor_info(model.lr)
+
+    #pred = tf.saved_model.utils.build_tensor_info(graph.get_operation_by_name('m_0/add').outputs[0])
+    pred = tf.saved_model.utils.build_tensor_info(model.score_i)
+
+    labeling_signature = (
+      tf.saved_model.signature_def_utils.build_signature_def(
+        inputs={
+          "u": u,
+          "i": i,
+          "j": j,
+          # "y": y,
+          "hist_i": hist_i,
+          "sl": sl,
+          # "lr": lr,
+        },
+        outputs={
+          "pred": pred
+        },
+        method_name="tensorflow/serving/predict"))
+
+    legacy_init_op = tf.group(tf.tables_initializer(), name='legacy_init_op')
+
+    builder.add_meta_graph_and_variables(
+      sess, [tf.saved_model.tag_constants.SERVING],
+      signature_def_map={tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: labeling_signature},
+      main_op=tf.tables_initializer(),
+      strip_default_attrs=True)
+
+    builder.save()
+    print("Build Done")
+
+
+if __name__ == '__main__':
+  tf.app.run()
diff --git a/Model/lookalike-model/tests/config.yml b/Model/lookalike-model/tests/config.yml
deleted file mode 100644
index aec8df2..0000000
--- a/Model/lookalike-model/tests/config.yml
+++ /dev/null
@@ -1,130 +0,0 @@
-product_tag: 'lookalike'
-pipeline_tag: 'unittest'
-persona_table_name: 'lookalike_unittest_clean_input_persona'
-showlog_table_name: 'lookalike_unittest_clean_input_showlog'
-clicklog_table_name: 'lookalike_unittest_clean_input_clicklog'
-keywords_table: 'lookalike_unittest_clean_input_keywords'
-log:
-  level: 'WARN' # log level for spark and app
-features:
-  features: [
-              'slot_id',
-              'inter_type_cd',
-              'creat_type_cd',
-              #             'app_name',
-              'package_name',
-              'adv_device_type_cd',
-              'os_ver',
-              'device_name',
-              'os_language',
-              'req_date',
-              'media_busin_id',
-              'plat_income_amt',
-              'site_id',
-              'device_size',
-              'slot_status_cd',
-              'sdk_ver',
-              'screen_resolution',
-              'net_type',
-              'carrier',
-              'req_time',
-              'return_time',
-              'show_id',
-              'show_time',
-              'inside_channel_id',
-              'bill_env',
-              'media_busin_income_amt',
-              'adv_prim_class_cd',
-              'ssp_dsp_channel_id',
-              'adv_bill_mode_cd',
-              'oper_cnt',
-              'etl_time',
-              'show_freq_ctrl_mode',
-              'client_report_event_id',
-              'record_time_msec',
-              'repeat_report_count',
-              'last_report_fail_reason',
-              'position_id',
-              'award_type_cd',
-              'adx_order_id',
-              'content_down_method_cd',
-              'trade_type',
-              'spread_app_id',
-              'spread_app_industry_id',
-              'spread_daily_budget',
-              'adx_deal_cd',
-              'device_id_type_cd',
-              'oaid',
-              'enable_track_flg',
-              'valid_cd',
-              'max_adv_count',
-              'evt_jump_src_cd',
-              'evt_jump_src_fail_cd',
-              'chk_result',
-              'second_agent_id',
-              'adv_valid_period',
-              'first_agent_id',
-              'dynamic_creat_template_id',
-              'goods_lbry_id',
-              'goods_id',
-              'indu_type',
-              'click_success_jump_dst',
-              'evt_uuid',
-              'pay_type_flg',
-              'cuerrency',
-              'cfolder_type_flg',
-              'return_recommend_val',
-              'adx_order_purch_cnt',
-              'total_budget',
-              'cp_id',
-              'media_customize_acct_par1',
-              'teleplay_id',
-              'exp_cnt',
-              'exp_src',
-              'adv_show_contin_duration',
-              'max_show_ratio',
-              'adx_order_total_purch_cnt',
-              'media_receipt_acct',
-              'media_receipt_currency',
-              'service_judge_invalid_rule_code',
-              'realtm_charge_result_rule_code',
-              'plat_actual_cash_income',
-              'daily_show_max_cnt',
-              'slot_size',
-              'test_adv_flg',
-              'app_ver',
-              'adv_type',
-              'adv_prim_id',
-              'order_id',
-              'task_id',
-              'pps_inside_exprmt_ab_tag']
-pipeline:
-  main_clean:  
-    did_bucket_num: 2 # Number of partitions for did
-    load_logs_in_minutes: 1440 #1440/day, original=14400
-    create_keywords: False # set True for first run, then keep False to use the created table.
-    persona_output_table: 'lookalike_unittest_clean_output_persona'
-    showlog_output_table: 'lookalike_unittest_clean_output_showlog'
-    clicklog_output_table: 'lookalike_unittest_clean_output_clicklog'
-    conditions: {
-      'new_slot_id_list': [
-          'abcdef0', 'abcdef1', 'abcdef2', 'abcdef3', 'abcdef4', 
-          'abcdef5', 'abcdef6', 'abcdef7', 'abcdef8', 'abcdef9'
-      ],
-      'new_slot_id_app_name_list': [
-          'Huawei Magazine', 'Huawei Browser', 'Huawei Video', 'Huawei Music', 'Huawei Reading', 
-          'Huawei Magazine', 'Huawei Browser', 'Huawei Video', 'Huawei Music', 'Huawei Reading'
-      ],
-      'starting_date': '2020-01-01',
-      'ending_date': '2020-01-11'
-    }
-  main_logs:
-    interval_time_in_seconds: 86400 # default=1 day, group logs in interval time.
-    logs_output_table_name: '{product_tag}_{pipeline_tag}_logs'
-  main_trainready:
-    trainready_output_table: '{product_tag}_{pipeline_tag}_trainready'
-  tfrecords:
-    tfrecords_statistics_path: '{product_tag}_{pipeline_tag}_tfrecord_statistics.pkl'
-    tfrecords_hdfs_path: '{product_tag}_{pipeline_tag}_tfrecord' # it is hdfs location for tfrecords, over-writes the existing files
-  cutting_date: 1584748800
-  length: 10
diff --git a/Model/lookalike-model/tests/pipeline/config_clean.yml b/Model/lookalike-model/tests/pipeline/config_clean.yml
new file mode 100644
index 0000000..8ae2c63
--- /dev/null
+++ b/Model/lookalike-model/tests/pipeline/config_clean.yml
@@ -0,0 +1,39 @@
+product_tag: 'lookalike'
+pipeline_tag: 'unittest'
+persona_table_name: 'lookalike_unittest_clean_input_persona'
+showlog_table_name: 'lookalike_unittest_clean_input_showlog'
+clicklog_table_name: 'lookalike_unittest_clean_input_clicklog'
+keywords_table: 'lookalike_unittest_clean_input_keywords'
+log:
+  level: 'ERROR' # log level for spark and app
+pipeline:
+  main_clean:  
+    did_bucket_num: 2 # Number of partitions for did
+    load_logs_in_minutes: 1440 #1440/day, original=14400
+    create_keywords: False # set True for first run, then keep False to use the created table.
+    persona_output_table: 'lookalike_unittest_clean_output_persona'
+    showlog_output_table: 'lookalike_unittest_clean_output_showlog'
+    clicklog_output_table: 'lookalike_unittest_clean_output_clicklog'
+    conditions: {
+      'new_slot_id_list': [
+          'abcdef0', 'abcdef1', 'abcdef2', 'abcdef3', 'abcdef4', 
+          'abcdef5', 'abcdef6', 'abcdef7', 'abcdef8', 'abcdef9'
+      ],
+      'new_slot_id_app_name_list': [
+          'Huawei Magazine', 'Huawei Browser', 'Huawei Video', 'Huawei Music', 'Huawei Reading', 
+          'Huawei Magazine', 'Huawei Browser', 'Huawei Video', 'Huawei Music', 'Huawei Reading'
+      ],
+      'starting_date': '2020-01-01',
+      'ending_date': '2020-01-11'
+    }
+  main_logs:
+    interval_time_in_seconds: 86400 # default=1 day, group logs in interval time.
+    logs_output_table_name: '{product_tag}_{pipeline_tag}_logs'
+  main_trainready:
+    trainready_output_table: '{product_tag}_{pipeline_tag}_trainready'
+  tfrecords:
+    tfrecords_statistics_path: '{product_tag}_{pipeline_tag}_tfrecord_statistics.pkl'
+    tfrecords_hdfs_path: '{product_tag}_{pipeline_tag}_tfrecord' # it is hdfs location for tfrecords, over-writes the existing files
+  cutting_date: 1584748800
+  length: 10
+
diff --git a/Model/lookalike-model/tests/pipeline/config_logs.yml b/Model/lookalike-model/tests/pipeline/config_logs.yml
new file mode 100644
index 0000000..11413dc
--- /dev/null
+++ b/Model/lookalike-model/tests/pipeline/config_logs.yml
@@ -0,0 +1,26 @@
+product_tag: 'lookalike'
+pipeline_tag: 'unittest'
+log:
+  level: 'ERROR' # log level for spark and app
+pipeline:
+  main_clean:  
+    did_bucket_num: 2 # Number of partitions for did
+    load_logs_in_minutes: 1440 #1440/day, original=14400
+    showlog_output_table: 'lookalike_unittest_logs_input_showlog'
+    clicklog_output_table: 'lookalike_unittest_logs_input_clicklog'
+    conditions: {
+      'new_slot_id_list': [
+          'abcdef0', 'abcdef1', 'abcdef2', 'abcdef3', 'abcdef4', 
+          'abcdef5', 'abcdef6', 'abcdef7', 'abcdef8', 'abcdef9'
+      ],
+      'new_slot_id_app_name_list': [
+          'Huawei Magazine', 'Huawei Browser', 'Huawei Video', 'Huawei Music', 'Huawei Reading', 
+          'Huawei Magazine', 'Huawei Browser', 'Huawei Video', 'Huawei Music', 'Huawei Reading'
+      ],
+      'starting_date': '2020-01-01',
+      'ending_date': '2020-01-11'
+    }
+  main_logs:
+    interval_time_in_seconds: 86400 # default=1 day, group logs in interval time.
+    logs_output_table_name: 'lookalike_unittest_logs_output_logs'
+
diff --git a/Model/lookalike-model/tests/pipeline/data_generator.py b/Model/lookalike-model/tests/pipeline/data_generator.py
index fbd2d51..1b33f91 100644
--- a/Model/lookalike-model/tests/pipeline/data_generator.py
+++ b/Model/lookalike-model/tests/pipeline/data_generator.py
@@ -14,6 +14,7 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
+import sys
 from pyspark.sql.types import StringType, StructField, StructType, IntegerType
 from lookalike_model.pipeline.main_clean import add_did_bucket
 from lookalike_model.pipeline.util import write_to_table
@@ -29,6 +30,11 @@ def create_persona_table (spark, table_name):
     df.printSchema()
     write_to_table(df, table_name)
 
+# Creates cleaned persona data and writes it to Hive.
+def create_log_table (spark, table_name):
+    df = create_cleaned_persona(spark)
+    write_to_table(df, table_name)
+
 # Creates raw clicklog data and writes it to Hive.
 def create_clicklog_table (spark, table_name):
     df = create_raw_log(spark)
@@ -47,6 +53,11 @@ def create_showlog_table (spark, table_name):
     df.printSchema()
     write_to_table(df, table_name)
 
+# Creates cleaned click/showlog data and writes it to Hive.
+def create_log_table (spark, table_name):
+    df = create_cleaned_log(spark)
+    write_to_table(df, table_name)
+
 # Creates keyword data and writes it to Hive.
 def create_keywords_table (spark, table_name):
     df = create_keywords(spark)
@@ -118,17 +129,17 @@ def create_cleaned_persona (spark, bucket_num = 4):
 # Returns a dataframe with unclean log data.
 def create_raw_log (spark):
     data = [
-        ('0000001', '1000', 'splash', 'abcdef0', 'C000', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 12:34:56'),
-        ('0000002', '1000', 'splash', 'abcdef1', 'C001', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-02 12:34:56'),
-        ('0000003', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00',   '4G', 'CPD', '2020-01-03 12:34:56'),
-        ('0000004', '1001', 'native', 'abcdef3', 'C010', 'ABC-AL00',   '4G', 'CPD', '2020-01-04 12:34:56'),
-        ('0000005', '1002', 'splash', 'abcdef4', 'C011', 'DEF-AL00', 'WIFI', 'CPM', '2020-01-05 12:34:56'),
-        ('0000006', '1002', 'splash', 'abcdef5', 'C012', 'DEF-AL00', 'WIFI', 'CPM', '2020-01-06 12:34:56'),
-        ('0000007', '1003', 'splash', 'abcdef6', 'C020', 'XYZ-AL00',   '4G', 'CPT', '2020-01-07 12:34:56'),
-        ('0000008', '1003', 'splash', 'abcdef7', 'C021', 'XYZ-AL00',   '4G', 'CPT', '2020-01-08 12:34:56'),
-        ('0000009', '1004', 'splash', 'abcdef8', 'C022', 'TUV-AL00', 'WIFI', 'CPC', '2020-01-09 12:34:56'),
-        ('0000010', '1004', 'splash', 'abcdef9', 'C023', 'TUV-AL00', 'WIFI', 'CPC', '2020-01-10 12:34:56'),
-        ('0000001', '1000', 'native', 'abcde10', 'C004', 'JKL-AL00',   '4G', 'CPD', '2020-01-11 12:34:56'),
+        ('0000001', '1000', 'splash', 'abcdef0', 'C000', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 12:34:56.78'),
+        ('0000002', '1000', 'splash', 'abcdef1', 'C001', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-02 12:34:56.78'),
+        ('0000003', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00',   '4G', 'CPD', '2020-01-03 12:34:56.78'),
+        ('0000004', '1001', 'native', 'abcdef3', 'C010', 'ABC-AL00',   '4G', 'CPD', '2020-01-04 12:34:56.78'),
+        ('0000005', '1002', 'splash', 'abcdef4', 'C011', 'DEF-AL00', 'WIFI', 'CPM', '2020-01-05 12:34:56.78'),
+        ('0000006', '1002', 'splash', 'abcdef5', 'C012', 'DEF-AL00', 'WIFI', 'CPM', '2020-01-06 12:34:56.78'),
+        ('0000007', '1003', 'splash', 'abcdef6', 'C020', 'XYZ-AL00',   '4G', 'CPT', '2020-01-07 12:34:56.78'),
+        ('0000008', '1003', 'splash', 'abcdef7', 'C021', 'XYZ-AL00',   '4G', 'CPT', '2020-01-08 12:34:56.78'),
+        ('0000009', '1004', 'splash', 'abcdef8', 'C022', 'TUV-AL00', 'WIFI', 'CPC', '2020-01-09 12:34:56.78'),
+        ('0000010', '1004', 'splash', 'abcdef9', 'C023', 'TUV-AL00', 'WIFI', 'CPC', '2020-01-10 12:34:56.78'),
+        ('0000001', '1000', 'native', 'abcde10', 'C004', 'JKL-AL00',   '4G', 'CPD', '2020-01-11 12:34:56.78'), # Slot ID not in list so will be filtered.
     ]
 
     schema = StructType([
@@ -145,6 +156,42 @@ def create_raw_log (spark):
 
     return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
 
+# Returns a dataframe with cleaned log data.
+def create_cleaned_log (spark):
+    data = [
+        ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 12:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
+        ('C001', '0000002', '1000', 'splash', 'abcdef1', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-02 12:34:56.78', 'Huawei Browser', 1, 0, 'travel', '1', '2020-01-02', '1', ),
+        ('C002', '0000003', '1001', 'native', 'abcdef2', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78', 'Huawei Video', 0, 1, 'travel', '1', '2020-01-03', '1', ),
+        ('C010', '0000004', '1001', 'native', 'abcdef3', 'ABC-AL00', '4G', 'CPD', '2020-01-04 12:34:56.78', 'Huawei Music', 1, 1, 'game-avg', '2', '2020-01-04', '1', ),
+        ('C011', '0000005', '1002', 'splash', 'abcdef4', 'DEF-AL00', 'WIFI', 'CPM', '2020-01-05 12:34:56.78', 'Huawei Reading', 0, 2, 'game-avg', '2', '2020-01-05', '1', ),
+        ('C012', '0000006', '1002', 'splash', 'abcdef5', 'DEF-AL00', 'WIFI', 'CPM', '2020-01-06 12:34:56.78', 'Huawei Magazine', 1, 2, 'game-avg', '2', '2020-01-06', '0', ),
+        ('C020', '0000007', '1003', 'splash', 'abcdef6', 'XYZ-AL00', '4G', 'CPT', '2020-01-07 12:34:56.78', 'Huawei Browser', 0, 3, 'reading', '3', '2020-01-07', '0', ),
+        ('C021', '0000008', '1003', 'splash', 'abcdef7', 'XYZ-AL00', '4G', 'CPT', '2020-01-08 12:34:56.78', 'Huawei Video', 1, 3, 'reading', '3', '2020-01-08', '0', ),
+        ('C022', '0000009', '1004', 'splash', 'abcdef8', 'TUV-AL00', 'WIFI', 'CPC', '2020-01-09 12:34:56.78', 'Huawei Music', 0, 4, 'reading', '3', '2020-01-09', '0', ),
+        ('C023', '0000010', '1004', 'splash', 'abcdef9', 'TUV-AL00', 'WIFI', 'CPC', '2020-01-10 12:34:56.78', 'Huawei Reading', 1, 4, 'reading', '3', '2020-01-10', '1', ),
+    ]
+
+    schema = StructType([
+        StructField('spread_app_id', StringType(), True),
+        StructField('did', StringType(), True),
+        StructField('adv_id', StringType(), True),
+        StructField('media', StringType(), True),
+        StructField('slot_id', StringType(), True),
+        StructField('device_name', StringType(), True),
+        StructField('net_type', StringType(), True),
+        StructField('price_model', StringType(), True),
+        StructField('action_time', StringType(), True),
+        StructField('media_category', StringType(), True),
+        StructField('gender', IntegerType(), True),
+        StructField('age', IntegerType(), True),
+        StructField('keyword', StringType(), True),
+        StructField('keyword_index', StringType(), True),
+        StructField('day', StringType(), True),
+        StructField('did_bucket', StringType(), True),
+    ])
+
+    return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+
 # Returns a dataframe with keyword data.
 def create_keywords(spark):
     data = [
@@ -172,3 +219,27 @@ def create_keywords(spark):
     ])
 
     return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+
+# Prints to screen the code to generate the given data frame.
+def print_df_generator_code (df):
+    columns = df.columns
+    print
+    print('    data = [')
+    for row in df.collect():
+        sys.stdout.write('        (')
+        for column in columns:
+            if isinstance(df.schema[column].dataType, StringType):
+                sys.stdout.write('\'%s\', ' % row[column])
+            else:
+                sys.stdout.write('%s, ' % row[column])
+        print('),')
+    print('    ]')
+    print
+    print('    schema = StructType([')
+    for column in columns:
+        print('        StructField(\'%s\', %s(), True),' % (column, type(df.schema[column].dataType).__name__))
+    print('    ])')
+    print
+
+
+
diff --git a/Model/lookalike-model/tests/pipeline/test_main_clean.py b/Model/lookalike-model/tests/pipeline/test_main_clean.py
index 4be14a8..9163c86 100644
--- a/Model/lookalike-model/tests/pipeline/test_main_clean.py
+++ b/Model/lookalike-model/tests/pipeline/test_main_clean.py
@@ -110,7 +110,7 @@ class TestMainClean(unittest.TestCase):
     # Testing data look up and cleaning process for clicklog and showlog data.
     def test_clean_logs (self):
         print('*** Running test_clean_logs ***')
-        with open('config.yml', 'r') as ymlfile:
+        with open('pipeline/config_clean.yml', 'r') as ymlfile:
             cfg = yaml.safe_load(ymlfile)
 
         showlog_table = cfg['showlog_table_name']
@@ -154,7 +154,7 @@ class TestMainClean(unittest.TestCase):
     # Testing full data cleaning process for persona, clicklog, and showlog data.
     def test_run (self):
         print('*** Running test_run ***')
-        with open('config.yml', 'r') as ymlfile:
+        with open('pipeline/config_clean.yml', 'r') as ymlfile:
             cfg = yaml.safe_load(ymlfile)
 
         # Create the persona, keywords, clicklog and showlog tables.
@@ -189,11 +189,13 @@ class TestMainClean(unittest.TestCase):
 
         # Validate the cleaned clicklog table.
         df_clicklog = util.load_df(self.hive_context, clicklog_output_table)
+        print_df_generator_code(df_clicklog.sort('did'))
         df_log = create_raw_log(self.spark)
         self.validate_cleaned_log(df_clicklog, conditions, df_persona, df_keywords, df_log, bucket_num)
 
         # Validate the cleaned showlog table.
         df_showlog = util.load_df(self.hive_context, clicklog_output_table)
+        print_df_generator_code(df_showlog.sort('did'))
         df_log = create_raw_log(self.spark)
         self.validate_cleaned_log(df_showlog, conditions, df_persona, df_keywords, df_log, bucket_num)
 
@@ -238,36 +240,35 @@ class TestMainClean(unittest.TestCase):
     def validate_cleaned_log (self, df, conditions, df_persona, df_keywords, df_log, bucket_num):
         # Verify the column names.
         columns = ['spread_app_id', 'did', 'adv_id', 'media', 'slot_id', 'device_name', 
-            'net_type', 'price_model', 'action_time', 'media_category', 'gender', 'age', 
+            'net_type', 'price_model', 'action_time', 'gender', 'age', 
             'keyword', 'keyword_index', 'day', 'did_bucket']
         for name in columns:
             self.assertTrue(name in df.columns)
 
-        # Create a map between slot IDs and app names.
-        slot_id_map = { i: j for (i, j) in zip(conditions['new_slot_id_list'], conditions['new_slot_id_app_name_list']) }
+        # Verify the number of rows.
+        # The raw log count has one entry that will be filtered out so adjusted accordingly.
+        self.assertEqual(df.count(), df_log.count() - 1)
+
+        # Helper method for verifying table joins.
+        def assert_row_value (row, df_match, field_name, join_field):
+            self.assertEqual(row[field_name], df_match.filter(col(join_field) == row[join_field]).collect()[0][field_name])
 
         # Check the row values.
         for row in df.collect():
             self.assertTrue(row['slot_id'] in conditions['new_slot_id_list'])
-            self.assertTrue(row['media_category'] in conditions['new_slot_id_app_name_list'])
             self.assertEqual(row['day'], row['action_time'].split()[0])
             self.assertTrue(int(row['did_bucket']) < bucket_num)
-            self.assertEqual(row['media_category'], slot_id_map[row['slot_id']])
-            self.assert_row_value(row, df_persona, 'gender', 'did')
-            self.assert_row_value(row, df_persona, 'age', 'did')
-            self.assert_row_value(row, df_keywords, 'keyword', 'spread_app_id')
-            self.assert_row_value(row, df_keywords, 'keyword_index', 'spread_app_id')
-            self.assert_row_value(row, df_log, 'adv_id', 'did')
-            self.assert_row_value(row, df_log, 'media', 'did')
-            self.assert_row_value(row, df_log, 'slot_id', 'did')
-            self.assert_row_value(row, df_log, 'device_name', 'did')
-            self.assert_row_value(row, df_log, 'net_type', 'did')
-            self.assert_row_value(row, df_log, 'price_model', 'did')
-            self.assert_row_value(row, df_log, 'action_time', 'did')
-
-    # Helper method for verifying table joins.
-    def assert_row_value (self, row, df_match, field_name, join_field):
-        self.assertEqual(row[field_name], df_match.filter(col(join_field) == row[join_field]).collect()[0][field_name])
+            assert_row_value(row, df_persona, 'gender', 'did')
+            assert_row_value(row, df_persona, 'age', 'did')
+            assert_row_value(row, df_keywords, 'keyword', 'spread_app_id')
+            assert_row_value(row, df_keywords, 'keyword_index', 'spread_app_id')
+            assert_row_value(row, df_log, 'adv_id', 'did')
+            assert_row_value(row, df_log, 'media', 'did')
+            assert_row_value(row, df_log, 'slot_id', 'did')
+            assert_row_value(row, df_log, 'device_name', 'did')
+            assert_row_value(row, df_log, 'net_type', 'did')
+            assert_row_value(row, df_log, 'price_model', 'did')
+            assert_row_value(row, df_log, 'action_time', 'did')
 
 
 # Runs the tests.
diff --git a/Model/lookalike-model/tests/pipeline/test_main_logs.py b/Model/lookalike-model/tests/pipeline/test_main_logs.py
new file mode 100644
index 0000000..46112b7
--- /dev/null
+++ b/Model/lookalike-model/tests/pipeline/test_main_logs.py
@@ -0,0 +1,114 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+ 
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import unittest
+import yaml
+from pyspark import SparkContext
+from pyspark.sql import SparkSession, HiveContext
+from lookalike_model.pipeline import main_logs, util
+from data_generator import *
+
+class TestMainLogs(unittest.TestCase):
+
+    def setUp (self):
+        # Set the log level.
+        sc = SparkContext.getOrCreate()
+        sc.setLogLevel('ERROR')
+
+        # Initialize the Spark session
+        self.spark = SparkSession.builder.appName('unit test').enableHiveSupport().getOrCreate()
+        self.hive_context = HiveContext(sc)
+
+    def test_join_logs (self):
+        print('*** Running test_join_logs test ***')
+
+        # Load the configuration data.
+        with open('pipeline/config_logs.yml', 'r') as ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+
+        showlog_table = cfg['pipeline']['main_clean']['showlog_output_table']
+        clicklog_table = cfg['pipeline']['main_clean']['clicklog_output_table']
+        log_output_table = cfg['pipeline']['main_logs']['logs_output_table_name']
+        log_table_names = (showlog_table, clicklog_table, log_output_table)
+
+        did_bucket_num = cfg['pipeline']['main_clean']['did_bucket_num']
+        interval_time_in_seconds = cfg['pipeline']['main_logs']['interval_time_in_seconds']
+
+        # Create the input data tables.
+        create_log_table(self.spark, clicklog_table)
+        create_log_table(self.spark, showlog_table)
+
+        # Clear the output from any previous runs.
+        util.drop_table(self.hive_context, log_output_table)
+
+        # Run the method to be tested.
+        main_logs.join_logs(self.hive_context, util.load_batch_config(cfg), interval_time_in_seconds, log_table_names, did_bucket_num)
+
+        # Validate the output.
+        df = util.load_df(self.hive_context, log_output_table)
+        self.validate_unified_logs(df, create_cleaned_log(self.spark))
+
+    def test_run (self):
+        print('*** Running test_run test ***')
+
+        # Load the configuration data.
+        with open('pipeline/config_logs.yml', 'r') as ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+
+        # Create the input data tables.
+        showlog_table = cfg['pipeline']['main_clean']['showlog_output_table']
+        clicklog_table = cfg['pipeline']['main_clean']['clicklog_output_table']
+        create_log_table(self.spark, clicklog_table)
+        create_log_table(self.spark, showlog_table)
+
+        # Clear the output from any previous runs.
+        log_output_table = cfg['pipeline']['main_logs']['logs_output_table_name']
+        util.drop_table(self.hive_context, log_output_table)
+
+        # Run the method to be tested.
+        main_logs.run(self.hive_context, cfg)
+
+        # Validate the output.
+        df = util.load_df(self.hive_context, log_output_table)
+        print_df_generator_code(df.sort('did', 'is_click'))
+        self.validate_unified_logs(df, create_cleaned_log(self.spark))
+
+
+    def validate_unified_logs (self, df, df_log):
+        # Verify the column names.
+        columns = ['is_click', 'did', 'adv_id', 'media', 
+            'net_type', 'action_time', 'gender', 'age', 
+            'keyword', 'keyword_index', 'day', 'did_bucket']
+        for name in columns:
+            self.assertTrue(name in df.columns)
+
+        self.assertEqual(df.count(), 2*df_log.count())
+
+        df = df.sort('did', 'is_click')
+        rows = df.collect()
+        for i in range(0, len(rows), 2):
+            for column in columns:
+                if column == 'is_click':
+                    self.assertNotEqual(rows[i][column], rows[i+1][column])
+                else:
+                    self.assertEqual(rows[i][column], rows[i+1][column])
+
+
+
+# Runs the tests.
+if __name__ == '__main__':
+    # Run the unit tests.
+    unittest.main()
diff --git a/Model/lookalike-model/tests/run_test.sh b/Model/lookalike-model/tests/run_test.sh
index d5bcbf9..be1d942 100644
--- a/Model/lookalike-model/tests/run_test.sh
+++ b/Model/lookalike-model/tests/run_test.sh
@@ -9,3 +9,9 @@ then
     spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/test_main_clean.py
 fi
 
+# test_main_logs: merges click and show log data.
+if true
+then
+    spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/test_main_logs.py
+fi
+