You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2020/08/28 20:53:17 UTC

[systemds] branch master updated: [SYSTEMDS-2641] Updated slice finding algorithms and tests

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new c8a96b0  [SYSTEMDS-2641] Updated slice finding algorithms and tests
c8a96b0 is described below

commit c8a96b079e55f0cc7cd80c8827b2225336243307
Author: gilgenbergg <la...@gmail.com>
AuthorDate: Fri Aug 28 22:50:33 2020 +0200

    [SYSTEMDS-2641] Updated slice finding algorithms and tests
    
    Closes #1041.
---
 scripts/staging/slicing/base/SparkNode.py          |   8 +-
 scripts/staging/slicing/base/node.py               |  18 +-
 scripts/staging/slicing/base/slicer.py             |  63 ++-
 scripts/staging/slicing/base/top_k.py              |   4 +-
 scripts/staging/slicing/base/union_slicer.py       |  32 +-
 .../slicing/spark_modules/join_data_parallel.py    |  17 +-
 .../staging/slicing/spark_modules/spark_slicer.py  |  10 +-
 .../slicing/spark_modules/spark_union_slicer.py    |  10 +-
 .../staging/slicing/spark_modules/spark_utils.py   | 138 -------
 .../slicing/spark_modules/union_data_parallel.py   |  11 +-
 .../compas_bd.py}                                  |  81 ++--
 .../slicing/tests/classification/test_adult.py     |   9 +-
 .../{test_adult.py => test_compas.py}              |  64 ++-
 .../slicing/tests/regression/bd_spark_salary.py    |  15 +-
 .../slicing/tests/regression/spark_salary.py       |  56 ++-
 .../staging/slicing/tests/regression/toy_tests.py  | 428 +++++++++++++++++++++
 scripts/staging/slicing/utils/model_generator.py   |  77 ++++
 17 files changed, 742 insertions(+), 299 deletions(-)

diff --git a/scripts/staging/slicing/base/SparkNode.py b/scripts/staging/slicing/base/SparkNode.py
index a123624..81bfcb9 100644
--- a/scripts/staging/slicing/base/SparkNode.py
+++ b/scripts/staging/slicing/base/SparkNode.py
@@ -46,6 +46,7 @@ class SparkNode:
         self.size = 0
         self.score = 0
         self.loss = 0
+        self.s_upper = 0
         self.s_lower = 1
         self.key = ''
 
@@ -61,8 +62,6 @@ class SparkNode:
 
     def process_slice(self, loss_type):
         mask = self.make_slice_mask()
-        print("mask")
-        print(mask)
         if loss_type == 0:
             self.calc_l2(mask)
         elif loss_type == 1:
@@ -82,7 +81,6 @@ class SparkNode:
     def calc_l2(self, mask):
         max_tuple_error = 0
         sum_error = 0
-        size = 0
         filtered = self.filter_by_mask(mask)
         self.size = len(filtered)
         for row in filtered:
@@ -150,8 +148,8 @@ class SparkNode:
     def make_key(self):
         return self.name
 
-    def check_constraint(self, top_k, x_size, alpha):
-        return self.score >= top_k.min_score and self.size >= x_size / alpha
+    def check_constraint(self, top_k, x_size, alpha, cur_min):
+        return self.score >= cur_min and self.size >= x_size / alpha
 
     def check_bounds(self, top_k, x_size, alpha):
         return self.s_upper >= x_size / alpha and self.c_upper >= top_k.min_score
diff --git a/scripts/staging/slicing/base/node.py b/scripts/staging/slicing/base/node.py
index 4d80651..f7b553b 100644
--- a/scripts/staging/slicing/base/node.py
+++ b/scripts/staging/slicing/base/node.py
@@ -108,11 +108,13 @@ class Node:
         return list(filter(lambda row: all(row[1][attr] == 1 for attr in mask), self.complete_x))
 
     def calc_s_upper(self, cur_lvl):
-        cur_min = self.parents[0].size
-        for parent in self.parents:
-            if cur_lvl == 1:
+        if cur_lvl == 1:
+            cur_min = self.parents[0].size
+            for parent in self.parents:
                 cur_min = min(cur_min, parent.size)
-            else:
+        else:
+            cur_min = self.parents[0].s_upper
+            for parent in self.parents:
                 cur_min = min(cur_min, parent.s_upper)
         return cur_min
 
@@ -157,11 +159,11 @@ class Node:
     def make_key(self, new_id):
         return new_id, self.name
 
-    def check_constraint(self, top_k, x_size, alpha):
-        return self.score >= top_k.min_score and self.size >= x_size / alpha
+    def check_constraint(self, top_k, x_size, alpha, cur_min):
+        return self.score >= cur_min and self.size >= x_size / alpha
 
-    def check_bounds(self, top_k, x_size, alpha):
-        return self.s_upper >= x_size / alpha and self.c_upper >= top_k.min_score
+    def check_bounds(self, x_size, alpha, cur_min):
+        return self.s_upper >= x_size / alpha and self.c_upper >= cur_min
 
     def update_bounds(self, s_upper, s_lower, e_upper, e_max_upper, w):
         if self.s_upper:
diff --git a/scripts/staging/slicing/base/slicer.py b/scripts/staging/slicing/base/slicer.py
index a3c4a48..77851b6 100644
--- a/scripts/staging/slicing/base/slicer.py
+++ b/scripts/staging/slicing/base/slicer.py
@@ -21,6 +21,7 @@
 
 from slicing.base.node import Node
 from slicing.base.top_k import Topk
+import matplotlib.pyplot as plt
 
 
 # optimization function calculation:
@@ -39,9 +40,11 @@ def opt_fun(fi, si, f, x_size, w):
 # invalid combination example: node ABC + CDE (on 4th level) // result node - ABCDE (absurd for 4th level)
 def slice_name_nonsense(node_i, node_j, cur_lvl):
     attr1 = list(map(lambda x: x[0].split("_")[0], node_i.attributes))
+    key1 = node_i.key[0]
     attr2 = list(map(lambda x: x[0].split("_")[0], node_j.attributes))
+    key2 = node_j.key[0]
     commons = len(list(set(attr1) & set(attr2)))
-    return commons == cur_lvl - 1
+    return commons == cur_lvl - 1 and key1 < key2
 
 
 def union(lst1, lst2):
@@ -50,7 +53,7 @@ def union(lst1, lst2):
 
 
 def make_first_level(all_features, complete_x, loss, x_size, y_test, errors, loss_type, top_k, alpha, w):
-    first_level = []
+    first_level = {}
     counter = 0
     all_nodes = {}
     # First level slices are enumerated in a "classic way" (getting data and not analyzing bounds
@@ -65,10 +68,10 @@ def make_first_level(all_features, complete_x, loss, x_size, y_test, errors, los
         new_node.process_slice(loss_type)
         new_node.score = opt_fun(new_node.loss, new_node.size, loss, x_size, w)
         new_node.c_upper = new_node.score
-        first_level.append(new_node)
+        first_level[new_node.key] = new_node
         new_node.print_debug(top_k, 0)
         # constraints for 1st level nodes to be problematic candidates
-        if new_node.check_constraint(top_k, x_size, alpha):
+        if new_node.check_constraint(top_k, x_size, alpha, 1):
             # this method updates top k slices if needed
             top_k.add_new_top_slice(new_node)
         counter = counter + 1
@@ -77,9 +80,10 @@ def make_first_level(all_features, complete_x, loss, x_size, y_test, errors, los
 
 def join_enum(node_i, prev_lvl, complete_x, loss, x_size, y_test, errors, debug, alpha, w, loss_type, b_update, cur_lvl,
               all_nodes, top_k, cur_lvl_nodes):
-    for node_j in range(len(prev_lvl)):
+    for node_j in prev_lvl:
         flag = slice_name_nonsense(prev_lvl[node_i], prev_lvl[node_j], cur_lvl)
-        if flag and prev_lvl[node_j].key[0] > prev_lvl[node_i].key[0]:
+        required_number = len(union(prev_lvl[node_i].attributes, prev_lvl[node_j].attributes))
+        if flag and required_number == cur_lvl + 1:
             new_node = Node(complete_x, loss, x_size, y_test, errors)
             parents_set = set(new_node.parents)
             parents_set.add(prev_lvl[node_i])
@@ -92,8 +96,8 @@ def join_enum(node_i, prev_lvl, complete_x, loss, x_size, y_test, errors, debug,
             new_node.name = new_node.make_name()
             new_id = len(all_nodes)
             new_node.key = new_node.make_key(new_id)
-            if new_node.key[1] in all_nodes:
-                existing_item = all_nodes[new_node.key[1]]
+            if new_node.key[1] in cur_lvl_nodes:
+                existing_item = cur_lvl_nodes[new_node.key[1]]
                 parents_set = set(existing_item.parents)
                 existing_item.parents = parents_set
                 if b_update:
@@ -104,18 +108,18 @@ def join_enum(node_i, prev_lvl, complete_x, loss, x_size, y_test, errors, debug,
                     new_node.update_bounds(s_upper, s_lower, e_upper, e_max_upper, w)
             else:
                 new_node.calc_bounds(cur_lvl, w)
-                all_nodes[new_node.key[1]] = new_node
+                all_nodes[new_node.key[0]] = new_node
                 # check if concrete data should be extracted or not (only for those that have score upper
                 # big enough and if size of subset is big enough
-                to_slice = new_node.check_bounds(top_k, x_size, alpha)
+                to_slice = new_node.check_bounds(x_size, alpha, top_k.min_score)
                 if to_slice:
                     new_node.process_slice(loss_type)
                     new_node.score = opt_fun(new_node.loss, new_node.size, loss, x_size, w)
                     # we decide to add node to current level nodes (in order to make new combinations
                     # on the next one or not basing on its score value
-                    if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys:
+                    if new_node.check_constraint(top_k, x_size, alpha, top_k.min_score) and new_node.key not in top_k.keys:
                         top_k.add_new_top_slice(new_node)
-                    cur_lvl_nodes.append(new_node)
+                    cur_lvl_nodes[new_node.key[1]] = new_node
                 if debug:
                     new_node.print_debug(top_k, cur_lvl)
     return cur_lvl_nodes, all_nodes
@@ -131,30 +135,51 @@ def process(all_features, complete_x, loss, x_size, y_test, errors, debug, alpha
     levels = []
     top_k = Topk(k)
     first_level = make_first_level(all_features, complete_x, loss, x_size, y_test, errors, loss_type, top_k, alpha, w)
+    candidates = []
+    pruned = []
+    indexes = []
+    indexes.append(1)
+    candidates.append(len(first_level[0]))
+    pruned.append(len(first_level[0]))
     all_nodes = first_level[1]
     levels.append(first_level[0])
     # cur_lvl - index of current level, correlates with number of slice forming features
     cur_lvl = 1  # currently filled level after first init iteration
-    # currently for debug
-    print("Level 1 had " + str(len(all_features)) + " candidates")
     print()
     print("Current topk are: ")
     top_k.print_topk()
     # combining each candidate of previous level with every till it becomes useless (one node can't make a pair)
     while len(levels[cur_lvl - 1]) > 1:
-        cur_lvl_nodes = []
+        cur_lvl_nodes = {}
         prev_lvl = levels[cur_lvl - 1]
-        for node_i in range(len(prev_lvl)):
+        level_candidates = len(prev_lvl) * (len(prev_lvl) - 1)
+        candidates.append(level_candidates)
+        for node_i in prev_lvl:
             partial = join_enum(node_i, prev_lvl, complete_x, loss, x_size, y_test, errors, debug, alpha, w, loss_type,
                                 b_update, cur_lvl, all_nodes, top_k, cur_lvl_nodes)
             cur_lvl_nodes = partial[0]
             all_nodes = partial[1]
         cur_lvl = cur_lvl + 1
+        indexes.append(cur_lvl)
         levels.append(cur_lvl_nodes)
-        top_k.print_topk()
-        print("Level " + str(cur_lvl) + " had " + str(len(prev_lvl) * (len(prev_lvl) - 1)) +
+        print("Level " + str(cur_lvl) + " had " + str(candidates) +
               " candidates but after pruning only " + str(len(cur_lvl_nodes)) + " go to the next level")
-    print("Program stopped at level " + str(cur_lvl + 1))
+        pruned.append(len(cur_lvl_nodes))
+        print()
+        print("Current topk are: ")
+        top_k.print_topk()
+    plt.plot(indexes, candidates, 'r--',
+             indexes, pruned, 'g--')
+    plt.xlabel('Level')
+    plt.ylabel('Number of slices')
+    plt.show()
+    print("Program stopped at level " + str(cur_lvl))
     print()
     print("Selected slices are: ")
     top_k.print_topk()
+    print("candidates:")
+    print(candidates)
+    print(">>>>>>>>>")
+    print("pruned:")
+    print(pruned)
+    return top_k
diff --git a/scripts/staging/slicing/base/top_k.py b/scripts/staging/slicing/base/top_k.py
index 3957fea..38c6545 100644
--- a/scripts/staging/slicing/base/top_k.py
+++ b/scripts/staging/slicing/base/top_k.py
@@ -51,8 +51,8 @@ class Topk:
         for candidate in self.slices:
             print(candidate.name + ": " + "score = " + str(candidate.score) + "; size = " + str(candidate.size))
 
-    def buckets_top_k(self, cur_lvl_slices, x_size, alpha):
+    def buckets_top_k(self, cur_lvl_slices, x_size, alpha, cur_min):
         for bucket in cur_lvl_slices:
-            if bucket.check_constraint(self, x_size, alpha):
+            if bucket.size > x_size / alpha and bucket.score >= cur_min:
                 self.add_new_top_slice(bucket)
         return self
diff --git a/scripts/staging/slicing/base/union_slicer.py b/scripts/staging/slicing/base/union_slicer.py
index 78b7d2b..8ec64d5 100644
--- a/scripts/staging/slicing/base/union_slicer.py
+++ b/scripts/staging/slicing/base/union_slicer.py
@@ -22,6 +22,7 @@
 from slicing.base.node import Node
 from slicing.base.top_k import Topk
 from slicing.base.slicer import opt_fun, union
+import matplotlib.pyplot as plt
 
 
 def check_attributes(left_node, right_node):
@@ -69,6 +70,12 @@ def process(all_features, complete_x, loss, x_size, y_test, errors, debug, alpha
     first_level = make_first_level(all_features, complete_x, loss, x_size, y_test, errors, loss_type, w, alpha, top_k)
     # double appending of first level nodes in order to enumerating second level in the same way as others
     levels.append((first_level[0], len(all_features)))
+    candidates = []
+    pruned = []
+    indexes = []
+    indexes.append(1)
+    candidates.append(len(first_level[0]))
+    pruned.append(len(first_level[0]))
     all_nodes = first_level[1]
     # cur_lvl - index of current level, correlates with number of slice forming features
     cur_lvl = 1  # level that is planned to be filled later
@@ -82,12 +89,14 @@ def process(all_features, complete_x, loss, x_size, y_test, errors, debug, alpha
     while len(cur_lvl_nodes) > 0:
         cur_lvl_nodes = []
         count = 0
+        prev_lvl = levels[cur_lvl - 1]
         for left in range(int(cur_lvl / 2) + 1):
             right = cur_lvl - 1 - left
             for node_i in range(len(levels[left][0])):
                 for node_j in range(len(levels[right][0])):
                     flag = check_attributes(levels[left][0][node_i], levels[right][0][node_j])
-                    if flag:
+                    required_number = len(union(levels[left][0][node_i].attributes, levels[right][0][node_j].attributes))
+                    if flag and required_number == cur_lvl + 1:
                         new_node = Node(complete_x, loss, x_size, y_test, errors)
                         parents_set = set(new_node.parents)
                         parents_set.add(levels[left][0][node_i])
@@ -115,24 +124,39 @@ def process(all_features, complete_x, loss, x_size, y_test, errors, debug, alpha
                             all_nodes[new_node.key[1]] = new_node
                             # check if concrete data should be extracted or not (only for those that have score upper
                             # big enough and if size of subset is big enough
-                            to_slice = new_node.check_bounds(top_k, x_size, alpha)
+                            to_slice = new_node.check_bounds(x_size, alpha, top_k.min_score)
                             if to_slice:
                                 new_node.process_slice(loss_type)
                                 new_node.score = opt_fun(new_node.loss, new_node.size, loss, x_size, w)
                                 # we decide to add node to current level nodes (in order to make new combinations
                                 # on the next one or not basing on its score value
-                                if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys:
+                                if new_node.check_constraint(top_k, x_size, alpha, top_k.min_score) and new_node.key \
+                                        not in top_k.keys:
                                     top_k.add_new_top_slice(new_node)
                                 cur_lvl_nodes.append(new_node)
                             if debug:
                                 new_node.print_debug(top_k, cur_lvl)
             count = count + levels[left][1] * levels[right][1]
+        cur_lvl = cur_lvl + 1
+        indexes.append(cur_lvl)
         print("Level " + str(cur_lvl) + " had " + str(count) +
               " candidates but after pruning only " + str(len(cur_lvl_nodes)) + " go to the next level")
-        cur_lvl = cur_lvl + 1
         levels.append((cur_lvl_nodes, count))
+        candidates.append(count)
+        pruned.append(len(cur_lvl_nodes))
         top_k.print_topk()
+    plt.plot(indexes, candidates, 'r--',
+             indexes, pruned, 'g--')
+    plt.xlabel('Level')
+    plt.ylabel('Number of slices')
+    plt.show()
     print("Program stopped at level " + str(cur_lvl))
     print()
     print("Selected slices are: ")
     top_k.print_topk()
+    print("candidates:")
+    print(candidates)
+    print(">>>>>>>>>")
+    print("pruned:")
+    print(pruned)
+    return top_k
diff --git a/scripts/staging/slicing/spark_modules/join_data_parallel.py b/scripts/staging/slicing/spark_modules/join_data_parallel.py
index 4f2ff0e..f55b0ca 100644
--- a/scripts/staging/slicing/spark_modules/join_data_parallel.py
+++ b/scripts/staging/slicing/spark_modules/join_data_parallel.py
@@ -29,7 +29,8 @@ from slicing.spark_modules.spark_utils import approved_join_slice
 
 
 def rows_mapper(row, buckets, loss_type):
-    filtered = dict(filter(lambda bucket: all(attr in row[1] for attr in bucket[1].attributes), buckets.items()))
+    # filtered = dict(filter(lambda bucket: all(attr in row[1] for attr in bucket[1].attributes), buckets.items()))
+    filtered = dict(filter(lambda bucket: all(attr in row[0] for attr in bucket[1].attributes), buckets.items()))
     for item in filtered:
         filtered[item].update_metrics(row, loss_type)
     return filtered
@@ -79,8 +80,8 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
         bucket = Bucket(node, cur_lvl, w, x_size, loss)
         buckets[bucket.name] = bucket
     b_buckets = SparkContext.broadcast(sc, buckets)
-    rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2]))\
-        .map(lambda item: (item[0], item[1].tolist(), item[2]))
+    rows = predictions.rdd.map(lambda row: (row[1].indices, row[2]))\
+        .map(lambda item: list(item))
     mapped = rows.map(lambda row: rows_mapper(row, b_buckets.value, loss_type))
     flattened = mapped.flatMap(lambda line: (line.items()))
     reduced = flattened.combineByKey(combiner, merge_values, merge_combiners)
@@ -90,10 +91,11 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
         cur_lvl_nodes.map(lambda bucket: bucket.print_debug(b_topk.value)).collect()
     cur_lvl = 1
     prev_level = cur_lvl_nodes.collect()
-    top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+    top_k = top_k.buckets_top_k(prev_level, x_size, alpha, 1)
     while len(prev_level) > 0:
         b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
         b_topk = SparkContext.broadcast(sc, top_k)
+        cur_min = top_k.min_score
         b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
         top_k.print_topk()
         buckets = join_enum(prev_level, cur_lvl, x_size, alpha, top_k, w, loss)
@@ -109,12 +111,13 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
             .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1], loss, w, x_size, b_cur_lvl.value))\
             .collect()
         cur_lvl += 1
-        top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+        top_k = top_k.buckets_top_k(prev_level, x_size, alpha, cur_min)
         print("Level " + str(cur_lvl) + " had " + str(
-            len(b_cur_lvl_nodes.value * (len(b_cur_lvl_nodes.value) - 1)))+" candidates but after pruning only " +
+            len(b_cur_lvl_nodes.value * (len(prev_level) - 1)))+" candidates but after pruning only " +
               str(len(prev_level)) + " go to the next level")
-        print("Program stopped at level " + str(cur_lvl))
+        top_k.print_topk()
     print()
+    print("Program stopped at level " + str(cur_lvl - 1))
     print("Selected slices are: ")
     top_k.print_topk()
     return None
diff --git a/scripts/staging/slicing/spark_modules/spark_slicer.py b/scripts/staging/slicing/spark_modules/spark_slicer.py
index 83de579..9da5b98 100644
--- a/scripts/staging/slicing/spark_modules/spark_slicer.py
+++ b/scripts/staging/slicing/spark_modules/spark_slicer.py
@@ -73,7 +73,7 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
                                                                                           b_topk.value, w, loss_type)) \
         .map(lambda node: (node.key, node)).collect()
     first_level.update(init_slices)
-    update_top_k(first_level, top_k, alpha, predictions)
+    update_top_k(first_level, top_k, alpha, predictions, 1)
     prev_level = SparkContext.broadcast(sc, first_level)
     levels.append(prev_level)
     cur_lvl = cur_lvl + 1
@@ -82,20 +82,22 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
     while len(levels[cur_lvl - 1].value) > 0:
         nodes_list = {}
         b_topk = SparkContext.broadcast(sc, top_k)
+        cur_min = top_k.min_score
         partitions = sc.parallelize(levels[cur_lvl - 1].value.values())
         mapped = partitions.mapPartitions(lambda nodes: spark_utils.nodes_enum(nodes, levels[cur_lvl - 1].value.values(),
                                                                                predictions, loss, b_topk.value, alpha, k, w,
-                                                                               loss_type, cur_lvl, debug, enumerator))
+                                                                               loss_type, cur_lvl, debug, enumerator, cur_min))
         flattened = mapped.flatMap(lambda node: node)
         nodes_list.update(flattened.map(lambda node: (node.key, node)).distinct().collect())
         prev_level = SparkContext.broadcast(sc, nodes_list)
         levels.append(prev_level)
-        update_top_k(nodes_list, top_k, alpha, predictions)
+        update_top_k(nodes_list, top_k, alpha, predictions, cur_min)
         cur_lvl = cur_lvl + 1
         b_topk.value.print_topk()
         print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl - 1].value) * (len(levels[cur_lvl - 1].value) - 1)) +
               " candidates but after pruning only " + str(len(nodes_list)) + " go to the next level")
-    print("Program stopped at level " + str(cur_lvl))
+    print("Program stopped at level " + str(cur_lvl - 1))
     print()
     print("Selected slices are: ")
     top_k.print_topk()
+    return top_k
diff --git a/scripts/staging/slicing/spark_modules/spark_union_slicer.py b/scripts/staging/slicing/spark_modules/spark_union_slicer.py
index 02ee716..94172de 100644
--- a/scripts/staging/slicing/spark_modules/spark_union_slicer.py
+++ b/scripts/staging/slicing/spark_modules/spark_union_slicer.py
@@ -40,7 +40,7 @@ def process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type,
         .map(lambda node: (node.key, node)) \
         .collect()
     first_level.update(init_slices)
-    update_top_k(first_level, top_k, alpha, predictions)
+    update_top_k(first_level, top_k, alpha, predictions, 1)
     prev_level = SparkContext.broadcast(sc, first_level)
     levels.append(prev_level)
     cur_lvl = 1
@@ -48,24 +48,26 @@ def process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type,
     while len(levels[cur_lvl - 1].value) > 0:
         cur_lvl_res = {}
         b_topk = SparkContext.broadcast(sc, top_k)
+        cur_min = top_k.min_score
         for left in range(int(cur_lvl / 2) + 1):
             right = cur_lvl - left - 1
             partitions = sc.parallelize(levels[left].value.values())
             mapped = partitions.mapPartitions(lambda nodes: spark_utils.nodes_enum(nodes, levels[right].value.values(),
                                                                                    predictions, loss, b_topk.value, alpha, k,
                                                                                    w, loss_type, cur_lvl, debug,
-                                                                                   enumerator))
+                                                                                   enumerator, cur_min))
             flattened = mapped.flatMap(lambda node: node)
             partial = flattened.map(lambda node: (node.key, node)).collect()
             cur_lvl_res.update(partial)
         prev_level = SparkContext.broadcast(sc, cur_lvl_res)
         levels.append(prev_level)
-        update_top_k(cur_lvl_res, top_k, alpha, predictions)
+        update_top_k(cur_lvl_res, top_k, alpha, predictions, cur_min)
         cur_lvl = cur_lvl + 1
         top_k.print_topk()
         print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl - 1].value) * (len(levels[cur_lvl - 1].value) - 1)) +
               " candidates but after pruning only " + str(len(prev_level.value)) + " go to the next level")
-    print("Program stopped at level " + str(cur_lvl))
+    print("Program stopped at level " + str(cur_lvl - 1))
     print()
     print("Selected slices are: ")
     top_k.print_topk()
+    return top_k
diff --git a/scripts/staging/slicing/spark_modules/spark_utils.py b/scripts/staging/slicing/spark_modules/spark_utils.py
deleted file mode 100644
index cb83c71..0000000
--- a/scripts/staging/slicing/spark_modules/spark_utils.py
+++ /dev/null
@@ -1,138 +0,0 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-
-from pyspark.sql.functions import udf
-from pyspark.sql.types import FloatType
-
-from slicing.base.SparkNode import SparkNode
-from slicing.base.slicer import opt_fun, union
-
-calc_loss = udf(lambda target, prediction, type: calc_loss_fun(target, prediction, type), FloatType())
-model_type_init = udf(lambda type: init_model_type(type))
-
-
-def calc_loss_fun(target, prediction, type):
-    if type == 0:
-        return (prediction - target) ** 2
-    elif type == 1:
-        return float(target == prediction)
-
-
-def init_model_type(model_type):
-    if model_type == "regression":
-        return 0
-    elif model_type == "classification":
-        return 1
-
-
-def approved_join_slice(node_i, node_j, cur_lvl):
-    commons = len(list(set(node_i.attributes) & set(node_j.attributes)))
-    return commons == cur_lvl - 1
-
-
-def approved_union_slice(node_i, node_j):
-    if set(node_i.attributes).intersection(set(node_j.attributes)):
-        return False
-    return True
-
-
-def make_first_level(features, predictions, loss, top_k, w, loss_type):
-    first_level = []
-    # First level slices are enumerated in a "classic way" (getting data and not analyzing bounds
-    for feature in features:
-        new_node = SparkNode(loss, predictions)
-        new_node.parents = [feature]
-        new_node.attributes.append(feature)
-        new_node.name = new_node.make_name()
-        new_node.key = new_node.make_key()
-        new_node.process_slice(loss_type)
-        new_node.score = opt_fun(new_node.loss, new_node.size, loss, len(predictions), w)
-        new_node.c_upper = new_node.score
-        first_level.append(new_node)
-        new_node.print_debug(top_k, 0)
-    return first_level
-
-
-def process_node(node_i, level, loss, predictions, cur_lvl, top_k, alpha, loss_type, w, debug, enumerator):
-    cur_enum_nodes = []
-    for node_j in level:
-        if enumerator == "join":
-            flag = approved_join_slice(node_i, node_j, cur_lvl)
-        else:
-            flag = approved_union_slice(node_i, node_j)
-        if flag and int(node_i.name.split("&&")[0]) < int(node_j.name.split("&&")[0]):
-            new_node = SparkNode(loss, predictions)
-            parents_set = set(new_node.parents)
-            parents_set.add(node_i)
-            parents_set.add(node_j)
-            new_node.parents = list(parents_set)
-            parent1_attr = node_i.attributes
-            parent2_attr = node_j.attributes
-            new_node_attr = union(parent1_attr, parent2_attr)
-            new_node.attributes = new_node_attr
-            new_node.name = new_node.make_name()
-            new_node.key = new_node.make_key()
-            new_node.calc_bounds(cur_lvl, w)
-            to_slice = new_node.check_bounds(top_k, len(predictions), alpha)
-            if to_slice:
-                new_node.process_slice(loss_type)
-                new_node.score = opt_fun(new_node.loss, new_node.size, loss, len(predictions), w)
-                if new_node.check_constraint(top_k, len(predictions), alpha):
-                    cur_enum_nodes.append(new_node)
-            if debug:
-                new_node.print_debug(top_k, cur_lvl)
-    return cur_enum_nodes
-
-
-def nodes_enum(nodes, level, predictions, loss, top_k, alpha, k, w, loss_type, cur_lvl, debug, enumerator):
-    cur_enum_nodes = []
-    for node_i in nodes:
-        partial_nodes = process_node(node_i, level, loss, predictions, cur_lvl, top_k, alpha,
-                                     loss_type, w, debug, enumerator)
-        cur_enum_nodes.append(partial_nodes)
-    return cur_enum_nodes
-
-
-def init_top_k(first_level, top_k, alpha, predictions):
-    # driver updates topK
-    for sliced in first_level:
-        if sliced[1].check_constraint(top_k, len(predictions), alpha):
-            # this method updates top k slices if needed
-            top_k.add_new_top_slice(sliced[1])
-
-
-def update_top_k(new_slices, top_k, alpha, predictions):
-    # driver updates topK
-    for sliced in new_slices.values():
-        if sliced.check_constraint(top_k, len(predictions), alpha):
-            # this method updates top k slices if needed
-            top_k.add_new_top_slice(sliced)
-
-
-def calc_bucket_metrics(bucket, loss, w, x_size, cur_lvl):
-    bucket.calc_error()
-    bucket.score = opt_fun(bucket.error, bucket.size, loss, x_size, w)
-    if cur_lvl == 0:
-        bucket.s_upper = bucket.size
-        bucket.c_upper = bucket.score
-        bucket.s_lower = 1
-    return bucket
diff --git a/scripts/staging/slicing/spark_modules/union_data_parallel.py b/scripts/staging/slicing/spark_modules/union_data_parallel.py
index dcd1af3..5ea70cc 100644
--- a/scripts/staging/slicing/spark_modules/union_data_parallel.py
+++ b/scripts/staging/slicing/spark_modules/union_data_parallel.py
@@ -70,8 +70,10 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
         bucket = Bucket(node, cur_lvl, w, x_size, loss)
         buckets[bucket.name] = bucket
     b_buckets = SparkContext.broadcast(sc, buckets)
-    rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2])) \
-        .map(lambda item: (item[0], item[1].tolist(), item[2]))
+    # rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2])) \
+    #     .map(lambda item: (item[0], item[1].tolist(), item[2]))
+    rows = predictions.rdd.map(lambda row: row[1].indices) \
+        .map(lambda item: list(item))
     mapped = rows.map(lambda row: rows_mapper(row, b_buckets.value, loss_type))
     flattened = mapped.flatMap(lambda line: (line.items()))
     reduced = flattened.combineByKey(combiner, join_data_parallel.merge_values, join_data_parallel.merge_combiners)
@@ -83,9 +85,10 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
     prev_level = cur_lvl_nodes.collect()
     b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
     levels.append(b_cur_lvl_nodes)
-    top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+    top_k = top_k.buckets_top_k(prev_level, x_size, alpha, 1)
     while len(prev_level) > 0:
         b_topk = SparkContext.broadcast(sc, top_k)
+        cur_min = top_k.min_score
         b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
         top_k.print_topk()
         buckets = []
@@ -104,7 +107,7 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
         partial = flattened.combineByKey(combiner, join_data_parallel.merge_values, join_data_parallel.merge_combiners)
         prev_level = partial\
             .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1], loss, w, x_size, b_cur_lvl.value)).collect()
-        top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+        top_k = top_k.buckets_top_k(prev_level, x_size, alpha, cur_min)
         b_topk = SparkContext.broadcast(sc, top_k)
         if debug:
             partial.values().map(lambda bucket: bucket.print_debug(b_topk.value)).collect()
diff --git a/scripts/staging/slicing/tests/regression/bd_spark_salary.py b/scripts/staging/slicing/tests/classification/compas_bd.py
similarity index 64%
copy from scripts/staging/slicing/tests/regression/bd_spark_salary.py
copy to scripts/staging/slicing/tests/classification/compas_bd.py
index c32414b..3f1a5b3 100644
--- a/scripts/staging/slicing/tests/regression/bd_spark_salary.py
+++ b/scripts/staging/slicing/tests/classification/compas_bd.py
@@ -23,6 +23,8 @@ import sys
 
 from pyspark import SparkConf, SparkContext
 from pyspark.ml import Pipeline
+from pyspark.ml.classification import RandomForestClassifier
+from pyspark.ml.evaluation import MulticlassClassificationEvaluator
 from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, IndexToString
 from pyspark.ml.regression import LinearRegression
 from pyspark.sql import SQLContext
@@ -32,8 +34,6 @@ from sklearn.model_selection import train_test_split
 
 from slicing.spark_modules import spark_utils, join_data_parallel, union_data_parallel
 
-binner = udf(lambda arg: int(int(arg) // 5))
-
 
 if __name__ == "__main__":
     args = sys.argv
@@ -48,43 +48,41 @@ if __name__ == "__main__":
         debug = args[5]
         loss_type = int(args[6])
         enumerator = args[7]
+        dataset = args[8]
     else:
         k = 10
-        w = 0.5
-        alpha = 6
+        w = 0.1
+        alpha = 4
         b_update = True
         debug = True
         loss_type = 0
+        dataset = 'compas-scores-two-years.csv'
         enumerator = "union"
 
-    conf = SparkConf().setAppName("salary_test").setMaster('local[2]')
-    num_partitions = 2
-    model_type = "regression"
-    label = 'salary'
+    conf = SparkConf().setAppName("salary_test").setMaster('local[8]')
+    num_partitions = 8
+    model_type = "classification"
+    label = "is_recid"
     sparkContext = SparkContext(conf=conf)
     sqlContext = SQLContext(sparkContext)
-    fileRDD = sparkContext.textFile('salaries.csv', num_partitions)
-    header = fileRDD.first()
-    head_split = header.split(",")
-    head_split[0] = '_c0'
-    fileRDD = fileRDD.filter(lambda line: line != header)
-    data = fileRDD.map(lambda row: row.split(","))
-    dataset_df = sqlContext.createDataFrame(data, head_split)
+    dataset_df = sqlContext.read\
+        .option("badRecordsPath", "/tmp/badRecordsPath")\
+        .csv(dataset, header='true',
+                                     inferSchema='true')
+    # fileRDD = sparkContext.textFile(dataset, num_partitions)
+    # header = fileRDD.first()
+    # head_split = header.split(",")
+    # fileRDD = fileRDD.filter(lambda line: line != header)
+    # data = fileRDD.map(lambda row: row.split(","))
+    # dataset_df = sqlContext.createDataFrame(data, head_split)
 
-    cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"]
-    # initializing stages of main transformation pipeline
+    cat_features = ["sex", "age_cat", "race", "decile_score10", "c_charge_degree", "c_charge_desc"]
     stages = []
-    dataset_df = dataset_df.drop('_c0')
     dataset_df = dataset_df.withColumn("id", sf.monotonically_increasing_id())
-    # bining numeric features by local binner udf function (specified for current dataset if needed)
-    dataset_df = dataset_df.withColumn('sincephd_bin', binner(dataset_df['sincephd']))
-    dataset_df = dataset_df.withColumn('service_bin', binner(dataset_df['service']))
-    dataset_df = dataset_df.withColumn('model_type', sf.lit(0))
-    dataset_df = dataset_df.drop('sincephd', 'service')
     dataset_df = dataset_df.withColumn('target', dataset_df[label].cast("int"))
-    # hot encoding categorical features
+    dataset_df = dataset_df.withColumn('model_type', sf.lit(1))
     for feature in cat_features:
-        string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index")
+        string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index").setHandleInvalid("skip")
         encoder = OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], outputCols=[feature + "_vec"])
         encoder.setDropLast(False)
         stages += [string_indexer, encoder]
@@ -113,19 +111,26 @@ if __name__ == "__main__":
     train, test = train_test_split(df_transform_fin, test_size=0.3, random_state=0)
     train_df = sqlContext.createDataFrame(train)
     test_df = sqlContext.createDataFrame(test)
-    lr = LinearRegression(featuresCol='features', labelCol='target', maxIter=10, regParam=0.3, elasticNetParam=0.8)
-    lr_model = lr.fit(train_df)
-    eval = lr_model.evaluate(test_df)
-    f_l2 = eval.meanSquaredError
-    pred = eval.predictions
-    pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred['target'], pred['prediction'], pred['model_type']))
+
+    rf = RandomForestClassifier(featuresCol='features', labelCol="target", numTrees=10)
+    rf_model = rf.fit(train_df)
+    predictions = rf_model.transform(test_df)
+    # Select example rows to display.
+    predictions.select("id", "features", "target", "prediction", "model_type")
+    # Select (prediction, true label) and compute test error
+    evaluator = MulticlassClassificationEvaluator(
+        labelCol="target", predictionCol="prediction", metricName="accuracy")
+    accuracy = evaluator.evaluate(predictions)
+    loss = 1.0 - accuracy
+    pred_df_fin = predictions.withColumn('error',
+                                         spark_utils.calc_loss(predictions["target"], predictions['prediction'],
+                                                               predictions['model_type']))
     predictions = pred_df_fin.select('id', 'features', 'error').repartition(num_partitions)
-    converter = IndexToString(inputCol='features', outputCol='cats')
-    all_features = range(predictions.toPandas().values[1][1].size)
-    k = 10
+    all_features = range(predictions.toPandas().values[0][0])
+
     if enumerator == "join":
-        join_data_parallel.parallel_process(all_features, predictions, f_l2, sparkContext, debug=debug, alpha=alpha,
-                                            k=k, w=w, loss_type=loss_type)
+        join_data_parallel.parallel_process(all_features, predictions, loss, sparkContext, debug=debug, alpha=alpha, k=k,
+                                            w=w, loss_type=loss_type)
     elif enumerator == "union":
-        union_data_parallel.parallel_process(all_features, predictions, f_l2, sparkContext, debug=debug, alpha=alpha,
-                                             k=k, w=w, loss_type=loss_type)
+        union_data_parallel.parallel_process(all_features, predictions, loss, sparkContext, debug=debug, alpha=alpha, k=k,
+                                             w=w, loss_type=loss_type)
diff --git a/scripts/staging/slicing/tests/classification/test_adult.py b/scripts/staging/slicing/tests/classification/test_adult.py
index 9575592..cacb715 100644
--- a/scripts/staging/slicing/tests/classification/test_adult.py
+++ b/scripts/staging/slicing/tests/classification/test_adult.py
@@ -23,6 +23,7 @@ import sys
 
 import pandas as pd
 from sklearn.preprocessing import OneHotEncoder
+from sklearn import svm
 
 from slicing.base import slicer as slicer, union_slicer
 from sklearn.ensemble import RandomForestClassifier
@@ -83,14 +84,8 @@ if __name__ == "__main__":
         complete_y.append((counter, y_test[counter]))
         counter = counter + 1
     x_size = counter
-    clf = RandomForestClassifier(n_jobs=2, random_state=0)
+    clf = svm.SVC()
     clf.fit(x_train, y_train)
-    RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
-                max_depth=None, max_features='auto', max_leaf_nodes=None,
-                min_impurity_split=1e-07, min_samples_leaf=1,
-                min_samples_split=2, min_weight_fraction_leaf=0.0,
-                n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
-                verbose=0, warm_start=False)
 
     # alpha is size significance coefficient
     # verbose option is for returning debug info while creating slices and printing it
diff --git a/scripts/staging/slicing/tests/classification/test_adult.py b/scripts/staging/slicing/tests/classification/test_compas.py
similarity index 73%
copy from scripts/staging/slicing/tests/classification/test_adult.py
copy to scripts/staging/slicing/tests/classification/test_compas.py
index 9575592..7fbb6fc 100644
--- a/scripts/staging/slicing/tests/classification/test_adult.py
+++ b/scripts/staging/slicing/tests/classification/test_compas.py
@@ -19,16 +19,15 @@
 #
 #-------------------------------------------------------------
 
-import sys
-
 import pandas as pd
-from sklearn.preprocessing import OneHotEncoder
-
-from slicing.base import slicer as slicer, union_slicer
 from sklearn.ensemble import RandomForestClassifier
-from sklearn import preprocessing
+from sklearn.linear_model import LinearRegression
+from sklearn.metrics import mean_squared_error
 from sklearn.model_selection import train_test_split
+from sklearn.preprocessing import OneHotEncoder
+import sys
 
+from slicing.base import slicer, union_slicer
 
 if __name__ == "__main__":
     args = sys.argv
@@ -49,35 +48,23 @@ if __name__ == "__main__":
         alpha = 4
         b_update = True
         debug = True
-        loss_type = 1
+        loss_type = 0
         enumerator = "union"
-    dataset = pd.read_csv('/slicing/datasets/adult.csv')
+    file_name = 'slicing/datasets/real/compas/compas-test.csv'
+    dataset = pd.read_csv(file_name)
     attributes_amount = len(dataset.values[0])
+    y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values
+    # starting with one not including id field
     x = dataset.iloc[:, 0:attributes_amount - 1].values
-    y = dataset.iloc[:, attributes_amount - 1]
-    le = preprocessing.LabelEncoder()
-    le.fit(y)
-    y = le.transform(y)
+    # hot encoding of categorical features
+    enc = OneHotEncoder(handle_unknown='ignore')
+    x = enc.fit_transform(x).toarray()
     complete_x = []
     complete_y = []
     counter = 0
-    all_indexes = []
-    not_encoded_columns = [
-        "Age", "WorkClass", "fnlwgt", "Education", "EducationNum",
-        "MaritalStatus", "Occupation", "Relationship", "Race", "Gender",
-        "CapitalGain", "CapitalLoss", "HoursPerWeek", "NativeCountry", "Income"
-    ]
-    for row in x:
-            row[0] = int(row[0] / 10)
-            row[2] = int(row[2]) // 100000
-            row[4] = int(row[4] / 5)
-            row[10] = int(row[10] / 1000)
-            row[12] = int(row[12] / 10)
-    enc = OneHotEncoder(handle_unknown='ignore')
-    x = enc.fit_transform(x).toarray()
     all_features = enc.get_feature_names()
-    x_size = len(complete_x)
-    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0)
+    # train model on a whole dataset
+    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0)
     for item in x_test:
         complete_x.append((counter, item))
         complete_y.append((counter, y_test[counter]))
@@ -86,18 +73,11 @@ if __name__ == "__main__":
     clf = RandomForestClassifier(n_jobs=2, random_state=0)
     clf.fit(x_train, y_train)
     RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
-                max_depth=None, max_features='auto', max_leaf_nodes=None,
-                min_impurity_split=1e-07, min_samples_leaf=1,
-                min_samples_split=2, min_weight_fraction_leaf=0.0,
-                n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
-                verbose=0, warm_start=False)
-
-    # alpha is size significance coefficient
-    # verbose option is for returning debug info while creating slices and printing it
-    # k is number of top-slices we want
-    # w is a weight of error function significance (1 - w) is a size significance propagated into optimization function
-    # loss_type = 0 (l2 in case of regression model
-    # loss_type = 1 (cross entropy in case of classification model)
+                           max_depth=None, max_features='auto', max_leaf_nodes=None,
+                           min_impurity_split=1e-07, min_samples_leaf=1,
+                           min_samples_split=2, min_weight_fraction_leaf=0.0,
+                           n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
+                           verbose=0, warm_start=False)
     preds = clf.predict(x_test)
     predictions = []
     counter = 0
@@ -108,6 +88,10 @@ if __name__ == "__main__":
             mistakes = mistakes + 1
         counter = counter + 1
     lossF = mistakes / counter
+    # alpha is size significance coefficient
+    # verbose option is for returning debug info while creating slices and printing it
+    # k is number of top-slices we want
+    # w is a weight of error function significance (1 - w) is a size significance propagated into optimization function
 
     # enumerator <union>/<join> indicates an approach of next level slices combination process:
     # in case of <join> in order to create new node of current level slicer
diff --git a/scripts/staging/slicing/tests/regression/bd_spark_salary.py b/scripts/staging/slicing/tests/regression/bd_spark_salary.py
index c32414b..1889292 100644
--- a/scripts/staging/slicing/tests/regression/bd_spark_salary.py
+++ b/scripts/staging/slicing/tests/regression/bd_spark_salary.py
@@ -48,6 +48,7 @@ if __name__ == "__main__":
         debug = args[5]
         loss_type = int(args[6])
         enumerator = args[7]
+        dataset = args[8]
     else:
         k = 10
         w = 0.5
@@ -55,18 +56,18 @@ if __name__ == "__main__":
         b_update = True
         debug = True
         loss_type = 0
-        enumerator = "union"
+        dataset = 'slicing/datasets/parallel_data/salaries/rows1000.csv'
+        enumerator = "join"
 
-    conf = SparkConf().setAppName("salary_test").setMaster('local[2]')
-    num_partitions = 2
+    conf = SparkConf().setAppName("salary_test").setMaster('local[4]')
+    num_partitions = 8
     model_type = "regression"
     label = 'salary'
     sparkContext = SparkContext(conf=conf)
     sqlContext = SQLContext(sparkContext)
-    fileRDD = sparkContext.textFile('salaries.csv', num_partitions)
+    fileRDD = sparkContext.textFile(dataset, num_partitions)
     header = fileRDD.first()
     head_split = header.split(",")
-    head_split[0] = '_c0'
     fileRDD = fileRDD.filter(lambda line: line != header)
     data = fileRDD.map(lambda row: row.split(","))
     dataset_df = sqlContext.createDataFrame(data, head_split)
@@ -74,7 +75,6 @@ if __name__ == "__main__":
     cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"]
     # initializing stages of main transformation pipeline
     stages = []
-    dataset_df = dataset_df.drop('_c0')
     dataset_df = dataset_df.withColumn("id", sf.monotonically_increasing_id())
     # bining numeric features by local binner udf function (specified for current dataset if needed)
     dataset_df = dataset_df.withColumn('sincephd_bin', binner(dataset_df['sincephd']))
@@ -121,8 +121,7 @@ if __name__ == "__main__":
     pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred['target'], pred['prediction'], pred['model_type']))
     predictions = pred_df_fin.select('id', 'features', 'error').repartition(num_partitions)
     converter = IndexToString(inputCol='features', outputCol='cats')
-    all_features = range(predictions.toPandas().values[1][1].size)
-    k = 10
+    all_features = list(decode_dict.keys())
     if enumerator == "join":
         join_data_parallel.parallel_process(all_features, predictions, f_l2, sparkContext, debug=debug, alpha=alpha,
                                             k=k, w=w, loss_type=loss_type)
diff --git a/scripts/staging/slicing/tests/regression/spark_salary.py b/scripts/staging/slicing/tests/regression/spark_salary.py
index 52d0cf2..be9868f 100644
--- a/scripts/staging/slicing/tests/regression/spark_salary.py
+++ b/scripts/staging/slicing/tests/regression/spark_salary.py
@@ -49,36 +49,71 @@ if __name__ == "__main__":
         debug = args[5]
         loss_type = int(args[6])
         enumerator = args[7]
+        dataset = args[8]
     else:
         k = 10
         w = 0.5
-        alpha = 6
+        alpha = 10
         b_update = True
         debug = True
         loss_type = 0
-        enumerator = "join"
+        enumerator = "union"
+        dataset = 'slicing/datasets/salaries.csv'
+        # dataset = 'slicing/datasets/parallel_attr/salaries/attr3000.csv'
 
-    conf = SparkConf().setAppName("salary_test").setMaster('local[2]')
-    num_partitions = 2
+    conf = SparkConf().setAppName("salary_test").setMaster('local[4]')
+    num_partitions = 4
     model_type = "regression"
     label = 'salary'
     sparkContext = SparkContext(conf=conf)
     sqlContext = SQLContext(sparkContext)
-    dataset_df = sqlContext.read.csv('salaries.csv', header='true', inferSchema='true')
+    dataset_df = sqlContext.read.csv(dataset, header='true',
+                                     inferSchema='true')
     # initializing stages of main transformation pipeline
     stages = []
     # list of categorical features for further hot-encoding
-    cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"]
+    cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"]  # base
+    # cat_features = ["rank", "discipline", "sex"]  # 10
+    # cat_features = ["rank", "discipline", "sincephd", "sex"]  # 50
+    # cat_features = ["rank", "discipline", "sex", "company", "country", "city", "start", "dept", "card"]  # 100
+    # cat_features = ["rank", "discipline", "sex", "company", "country", "city", "skills", "language", "tz", "card",
+    #                  "uni", "dept", "race", "code", "job", "size", "previous", "stock"]  # 1013
+    # cat_features = ["rank", "discipline", "sex", "company", "country", "city", "skills", "language", "tz", "card",
+    #                     "uni", "dept", "origin", "children", "bmi", "code", "job", "size", "previous", "stock", "market",
+    #                     "freq", "smoker", "region", "WorkClass", "Education", "EducationNum", "MaritalStatus",
+    #                     "Occupation", "Relationship", "color", "sepal_length", "sepal_width", "petal_length",
+    #                     "petal_width", "variety", "symboling", "normalized-losses", "make",	"fuel-type", "aspiration",
+    #                     "num-of-doors",	"body-style", "drive-wheels", "engine-location", "wheel-base", 	"length", "width",
+    #                     "height", "curb-weight", "engine-type", "num-of-cylinders", "engine-size", "fuel-system", "bore",
+    #                     "stroke", "compression-ratio", "horsepower", "peak-rpm", "city-mpg", "highway-mpg",	"price",
+    #                     "codeNumber", "clump", "cellsize", "cellshape", "adhesion", "epitel", "nuclei", "chromatin",
+    #                     "nucleoli", "mitoses", "class", "elevation", "aspect", "slope", "distToHydr"]
+
+#     cat_features = ["rank", "discipline", "sex", "company", "country", "city", "skills", "language", "tz", "card",
+#                     "uni", "dept", "origin", "children", "bmi", "code", "job", "size", "previous", "stock", "market",
+#                     "freq", "smoker", "region", "WorkClass", "Education", "EducationNum", "MaritalStatus",
+#                     "Occupation", "Relationship", "color", "sepal_length", "sepal_width", "petal_length",
+#                     "petal_width", "variety", "symboling", "normalized-losses", "make",	"fuel-type", "aspiration",
+#                     "num-of-doors",	"body-style", "drive-wheels", "engine-location", "wheel-base", 	"length", "width",
+#                     "height", "curb-weight", "engine-type", "num-of-cylinders", "engine-size", "fuel-system", "bore",
+#                     "stroke", "compression-ratio", "horsepower", "peak-rpm", "city-mpg", "highway-mpg",	"price",
+#                     "codeNumber", "clump", "cellsize", "cellshape", "adhesion", "epitel", "nuclei", "chromatin",
+#                     "nucleoli", "mitoses", "class", "elevation", "aspect", "slope", "distToHydr", "distToRoad",
+#                     "hillshadeMorning", "hillSahdeNoon", "hillSadeAfternoon", "distToFire", "DSName", "T", "N", "p",
+#                     "k", "Bin", "Cost",	"Sdratio", "correl", "cancor1",	"cancor2", "fract1", "fract2", "skewness",
+#                     "kurtosis",	"Hc", "Hx", "Mcx", "EnAttr", "NSRation", "Alg", "error"]   # 3000
+
+    # lines till 87 only for base case
     # removing column with ID field
-    dataset_df = dataset_df.drop('_c0')
+    # dataset_df = dataset_df.drop('_c0')
     # bining numeric features by local binner udf function (specified for current dataset if needed)
     dataset_df = dataset_df.withColumn('sincephd_bin', binner(dataset_df['sincephd']))
     dataset_df = dataset_df.withColumn('service_bin', binner(dataset_df['service']))
-    dataset_df = dataset_df.withColumn('model_type', sf.lit(0))
     dataset_df = dataset_df.drop('sincephd', 'service')
+    dataset_df = dataset_df.withColumn('model_type', sf.lit(0))
     # hot encoding categorical features
     for feature in cat_features:
-        string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index")
+        string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index").setHandleInvalid("skip")
         encoder = OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], outputCols=[feature + "_vec"])
         encoder.setDropLast(False)
         stages += [string_indexer, encoder]
@@ -112,9 +147,8 @@ if __name__ == "__main__":
     pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred[label], pred['prediction'], pred['model_type']))
     predictions = pred_df_fin.select('features', 'error').repartition(num_partitions)
     converter = IndexToString(inputCol='features', outputCol='cats')
-    all_features = range(predictions.toPandas().values[0][0].size)
+    all_features = list(decode_dict.keys())
     predictions = predictions.collect()
-    k = 10
     if enumerator == "join":
         spark_slicer.parallel_process(all_features, predictions, f_l2, sparkContext, debug=debug, alpha=alpha, k=k, w=w,
                                       loss_type=loss_type, enumerator=enumerator)
diff --git a/scripts/staging/slicing/tests/regression/toy_tests.py b/scripts/staging/slicing/tests/regression/toy_tests.py
new file mode 100644
index 0000000..876aa8b
--- /dev/null
+++ b/scripts/staging/slicing/tests/regression/toy_tests.py
@@ -0,0 +1,428 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import unittest
+
+import pandas as pd
+from pyspark import SparkConf, SparkContext, SQLContext
+from pyspark.ml import Pipeline
+from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, IndexToString
+from pyspark.ml.regression import LinearRegression
+import pyspark.sql.functions as sf
+from sklearn import linear_model
+from sklearn.metrics import mean_squared_error, r2_score
+from sklearn.preprocessing import OneHotEncoder
+
+import slicing.base.slicer as slicer
+from slicing.base import union_slicer
+from slicing.base.node import Node
+from slicing.base.top_k import Topk
+from slicing.spark_modules import spark_utils, spark_slicer, spark_union_slicer
+
+
+class SliceTests(unittest.TestCase):
+    loss_type = 0
+    # x, y = m.generate_dataset(10, 100)
+    train_dataset = pd.read_csv("toy_train.csv")
+    attributes_amount = len(train_dataset.values[0])
+    model = linear_model.LinearRegression()
+    y_train = train_dataset.iloc[:, attributes_amount - 1:attributes_amount].values
+    x_train = train_dataset.iloc[:, 0:attributes_amount - 1].values
+    model.fit(x_train, y_train)
+    test_dataset = pd.read_csv("toy.csv")
+    y_test = test_dataset.iloc[:, attributes_amount - 1:attributes_amount].values
+    x_test = test_dataset.iloc[:, 0:attributes_amount - 1].values
+    y_pred = model.predict(x_test)
+    print("Mean squared error: %.2f"
+          % mean_squared_error(y_test, y_pred))
+    print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
+    # Now that we have trained the model, we can print the coefficient of x that it has predicted
+    print('Coefficients: \n', model.coef_)
+    enc = OneHotEncoder(handle_unknown='ignore')
+    x = enc.fit_transform(x_test).toarray()
+    complete_x = []
+    complete_y = []
+    counter = 0
+    for item in x:
+        complete_x.append((counter, item))
+        complete_y.append((counter, y_test[counter]))
+        counter = counter + 1
+    all_features = enc.get_feature_names()
+    loss = mean_squared_error(y_test, y_pred)
+    devs = (y_pred - y_test) ** 2
+    errors = []
+    counter = 0
+    for pred in devs:
+        errors.append((counter, pred))
+        counter = counter + 1
+    k = 5
+    w = 0.5
+    alpha = 4
+    top_k = Topk(k)
+    debug = True
+    b_update = True
+    first_level = slicer.make_first_level(all_features, list(complete_x), loss, len(complete_x), y_test, errors,
+                                          loss_type, top_k, alpha, w)
+    first_level_nodes = first_level[0]
+    slice_member = first_level_nodes[(7, 'x2_2')]
+
+    def test_attr_spark(self):
+        conf = SparkConf().setAppName("toy_test").setMaster('local[2]')
+        num_partitions = 2
+        enumerator = "join"
+        model_type = "regression"
+        label = 'target'
+        sparkContext = SparkContext(conf=conf)
+        sqlContext = SQLContext(sparkContext)
+        train_df = sqlContext.read.csv("toy_train.csv", header='true',
+                            inferSchema='true')
+        test_df = sqlContext.read.csv("toy.csv", header='true',
+                            inferSchema='true')
+        # initializing stages of main transformation pipeline
+        stages = []
+        # list of categorical features for further hot-encoding
+        cat_features = ['a', 'b', 'c']
+        for feature in cat_features:
+            string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index").setHandleInvalid("skip")
+            encoder = OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], outputCols=[feature + "_vec"])
+            encoder.setDropLast(False)
+            stages += [string_indexer, encoder]
+        assembler_inputs = [feature + "_vec" for feature in cat_features]
+        assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="assembled_inputs")
+        stages += [assembler]
+        assembler_final = VectorAssembler(inputCols=["assembled_inputs"], outputCol="features")
+        stages += [assembler_final]
+        pipeline = Pipeline(stages=stages)
+        train_pipeline_model = pipeline.fit(train_df)
+        test_pipeline_model = pipeline.fit(test_df)
+        train_df_transformed = train_pipeline_model.transform(train_df)
+        test_df_transformed = test_pipeline_model.transform(test_df)
+        train_df_transformed = train_df_transformed.withColumn('model_type', sf.lit(0))
+        test_df_transformed = test_df_transformed.withColumn('model_type', sf.lit(0))
+        decode_dict = {}
+        counter = 0
+        cat = 0
+        for feature in cat_features:
+            colIdx = test_df_transformed.select(feature, feature + "_index").distinct().rdd.collectAsMap()
+            colIdx = {k: v for k, v in sorted(colIdx.items(), key=lambda item: item[1])}
+            for item in colIdx:
+                decode_dict[counter] = (cat, item, colIdx[item], counter)
+                counter = counter + 1
+            cat = cat + 1
+        train_df_transform_fin = train_df_transformed.select('features', label, 'model_type')
+        test_df_transform_fin = test_df_transformed.select('features', label, 'model_type')
+        lr = LinearRegression(featuresCol='features', labelCol=label, maxIter=10, regParam=0.0, elasticNetParam=0.8)
+        lr_model = lr.fit(train_df_transform_fin)
+        eval = lr_model.evaluate(test_df_transform_fin)
+        f_l2 = eval.meanSquaredError
+        pred = eval.predictions
+        pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred[label], pred['prediction'], pred['model_type']))
+        predictions = pred_df_fin.select('features', 'error').repartition(num_partitions)
+        converter = IndexToString(inputCol='features', outputCol='cats')
+        all_features = list(decode_dict)
+        predictions = predictions.collect()
+        spark_join = spark_slicer.parallel_process(all_features, predictions, f_l2, sparkContext, debug=self.debug, alpha=self.alpha,
+                                      k=self.k, w=self.w, loss_type=self.loss_type, enumerator="join")
+        spark_union = spark_union_slicer.process(all_features, predictions, f_l2, sparkContext, debug=self.debug, alpha=self.alpha,
+                                      k=self.k, w=self.w, loss_type=self.loss_type, enumerator="union")
+        self.assertEqual(3, len(spark_join.slices))
+        print("check1")
+        self.assertEqual(spark_join.min_score, spark_union.min_score)
+        print("check2")
+        self.assertEqual(spark_join.keys, spark_union.keys)
+        print("check3")
+        self.assertEqual(len(spark_join.slices), len(spark_union.slices))
+        print("check4")
+        idx = -1
+        for sliced in spark_join.slices:
+            idx += 1
+            self.assertEqual(sliced.score, spark_union.slices[idx].score)
+        print("check5")
+
+    def test_features_number(self):
+        self.assertEqual(len(self.all_features), 9)
+        print("check 1")
+
+    def test_base_first_level(self):
+        self.assertEqual(9, len(self.first_level_nodes))
+        print("check 2")
+
+    def test_parents_first(self):
+        self.assertIn(('x2_2', 7), self.slice_member.parents)
+        print("check 3")
+
+    def test_name(self):
+        self.assertEqual('x2_2', self.slice_member.make_name())
+        print("check 4")
+
+    def test_size(self):
+        self.assertEqual(36, self.slice_member.size)
+        print("check 5")
+
+    def test_e_upper(self):
+        self.assertEqual(81, self.slice_member.e_upper)
+        print("check 6")
+
+    def test_loss(self):
+        self.assertEqual(22, int(self.slice_member.loss))
+        print("check 7")
+
+    def test_opt_fun(self):
+        self.slice_member.score = slicer.opt_fun(self.slice_member.loss, self.slice_member.size, self.loss, len(self.x_test), self.w)
+        print("check 8")
+
+    def test_score(self):
+        self.assertEqual(1.2673015873015872, self.slice_member.score)
+        print("check 9")
+
+    def test_base_join_enum(self):
+        cur_lvl_nodes = {}
+        all_nodes = {}
+        b_update = True
+        cur_lvl = 1
+        slice_index = (2, 'x0_3')
+        combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
+                               len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
+                               self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
+        self.assertEqual(6, len(combined[0]))
+        print("check1")
+
+    def test_parents_second(self):
+        cur_lvl_nodes = {}
+        all_nodes = {}
+        b_update = True
+        cur_lvl = 1
+        slice_index = (2, 'x0_3')
+        combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
+                                    len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
+                                    self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
+        parent1 = combined[0][('x0_3 && x1_3')]
+        parent2 = combined[0][('x0_3 && x2_2')]
+        new_node = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
+        new_node.parents = [parent1, parent2]
+        parent1_attr = parent1.attributes
+        parent2_attr = parent2.attributes
+        new_node_attr = slicer.union(parent1_attr, parent2_attr)
+        self.assertEqual(new_node_attr, [('x0_3', 2), ('x1_3', 5), ('x2_2', 7)])
+        print("check2")
+
+    def test_nonsense(self):
+        cur_lvl_nodes = {}
+        all_nodes = {}
+        b_update = True
+        cur_lvl = 1
+        slice_index = (2, 'x0_3')
+        combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
+                                    len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
+                                    self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
+        parent1 = combined[0][('x0_3 && x1_3')]
+        parent2 = combined[0][('x0_3 && x2_2')]
+        new_node = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
+        new_node.parents = [parent1, parent2]
+        parent1_attr = parent1.attributes
+        parent2_attr = parent2.attributes
+        new_node_attr = slicer.union(parent1_attr, parent2_attr)
+        new_node.attributes = new_node_attr
+        new_node.name = new_node.make_name()
+        flagTrue = slicer.slice_name_nonsense(parent1, parent2, 2)
+        self.assertEqual(True, flagTrue)
+        print("check3")
+
+    def test_non_nonsense(self):
+        cur_lvl_nodes = {}
+        all_nodes = {}
+        b_update = True
+        cur_lvl = 1
+        slice_index = (2, 'x0_3')
+        parent3 = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
+        parent3.parents = [self.first_level_nodes[(4, 'x1_2')], self.first_level_nodes[(7, 'x2_2')]]
+        parent3.attributes = [('x1_2', 4), ('x2_2', 7)]
+        combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
+                                    len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
+                                    self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
+        parent2 = combined[0]['x0_3 && x2_3']
+        parent3.key = (8, 'x1_2 && x2_2')
+        flag_nonsense = slicer.slice_name_nonsense(parent2, parent3, 2)
+        self.assertEqual(True, flag_nonsense)
+        print("check4")
+
+    def test_uppers(self):
+        cur_lvl_nodes = {}
+        all_nodes = {}
+        b_update = True
+        cur_lvl = 1
+        slice_index = (2, 'x0_3')
+        parent3 = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
+        parent3.parents = [self.first_level_nodes[(4, 'x1_2')], self.first_level_nodes[(7, 'x2_2')]]
+        parent3.attributes = [('x1_2', 4), ('x2_2', 7)]
+        combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
+                                    len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
+                                    self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
+        parent1 = combined[0]['x0_3 && x1_3']
+        parent2 = combined[0]['x0_3 && x2_3']
+        new_node = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
+        new_node.parents = [parent1, parent2]
+        new_node.calc_bounds(2, self.w)
+        self.assertEqual(25, new_node.s_upper)
+        print("check5")
+        self.assertEqual(398, int(new_node.c_upper))
+        print("check6")
+
+    def test_topk_slicing(self):
+        join_top_k = slicer.process(self.all_features, self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors,
+                       self.debug, self.alpha, self.k, self.w, self.loss_type, self.b_update)
+        union_top_k = union_slicer.process(self.all_features, self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors,
+                       self.debug, self.alpha, self.k, self.w, self.loss_type, self.b_update)
+        self.assertEqual(join_top_k.min_score, union_top_k.min_score)
+        print("check1")
+        self.assertEqual(join_top_k.keys, union_top_k.keys)
+        print("check2")
+        self.assertEqual(len(join_top_k.slices), len(union_top_k.slices))
+        print("check3")
+        idx = -1
+        for sliced in join_top_k.slices:
+            idx += 1
+            self.assertEqual(sliced.score, union_top_k.slices[idx].score)
+        print("check4")
+
+    def test_extreme_target(self):
+        test_dataset = pd.read_csv("/home/lana/diploma/project/slicing/datasets/toy_extreme_change.csv")
+        y_test = test_dataset.iloc[:, self.attributes_amount - 1:self.attributes_amount].values
+        x_test = test_dataset.iloc[:, 0:self.attributes_amount - 1].values
+        y_pred = self.model.predict(x_test)
+        print("Mean squared error: %.2f"
+              % mean_squared_error(y_test, y_pred))
+        print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
+        # Now that we have trained the model, we can print the coefficient of x that it has predicted
+        print('Coefficients: \n', self.model.coef_)
+        enc = OneHotEncoder(handle_unknown='ignore')
+        x = enc.fit_transform(x_test).toarray()
+        complete_x = []
+        complete_y = []
+        counter = 0
+        for item in x:
+            complete_x.append((counter, item))
+            complete_y.append((counter, y_test[counter]))
+            counter = counter + 1
+        all_features = enc.get_feature_names()
+        loss = mean_squared_error(y_test, y_pred)
+        devs = (y_pred - y_test) ** 2
+        errors = []
+        counter = 0
+        for pred in devs:
+            errors.append((counter, pred))
+            counter = counter + 1
+        k = 5
+        w = 0.5
+        alpha = 4
+        top_k = Topk(k)
+        debug = True
+        b_update = True
+        first_level = slicer.make_first_level(all_features, list(complete_x), loss, len(complete_x), y_test, errors,
+                                              self.loss_type, top_k, alpha, w)
+        first_level_nodes = first_level[0]
+        slice_member = first_level_nodes[(7, 'x2_2')]
+        self.assertGreater(slice_member.loss, self.slice_member.loss)
+        print("check 1")
+        self.assertGreater(slice_member.score, self.slice_member.score)
+        print("check 2")
+
+    def test_error_significance(self):
+        y_test = self.test_dataset.iloc[:, self.attributes_amount - 1:self.attributes_amount].values
+        x_test = self.test_dataset.iloc[:, 0:self.attributes_amount - 1].values
+        y_pred = self.model.predict(x_test)
+        print("Mean squared error: %.2f"
+              % mean_squared_error(y_test, y_pred))
+        print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
+        # Now that we have trained the model, we can print the coefficient of x that it has predicted
+        print('Coefficients: \n', self.model.coef_)
+        enc = OneHotEncoder(handle_unknown='ignore')
+        x = enc.fit_transform(x_test).toarray()
+        complete_x = []
+        complete_y = []
+        counter = 0
+        for item in x:
+            complete_x.append((counter, item))
+            complete_y.append((counter, y_test[counter]))
+            counter = counter + 1
+        all_features = enc.get_feature_names()
+        loss = mean_squared_error(y_test, y_pred)
+        devs = (y_pred - y_test) ** 2
+        errors = []
+        counter = 0
+        for pred in devs:
+            errors.append((counter, pred))
+            counter = counter + 1
+        k = 5
+        # Maximized size significance
+        w = 0
+        alpha = 4
+        top_k = Topk(k)
+        debug = True
+        b_update = True
+        first_level = slicer.make_first_level(all_features, list(complete_x), loss, len(complete_x), y_test, errors,
+                                              self.loss_type, top_k, alpha, w)
+        first_level_nodes = first_level[0]
+        slice_member = first_level_nodes[(7, 'x2_2')]
+        self.assertGreater(self.slice_member.score, slice_member.score)
+
+    def test_size_significance(self):
+        y_test = self.test_dataset.iloc[:, self.attributes_amount - 1:self.attributes_amount].values
+        x_test = self.test_dataset.iloc[:, 0:self.attributes_amount - 1].values
+        y_pred = self.model.predict(x_test)
+        print("Mean squared error: %.2f"
+                  % mean_squared_error(y_test, y_pred))
+        print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
+        # Now that we have trained the model, we can print the coefficient of x that it has predicted
+        print('Coefficients: \n', self.model.coef_)
+        enc = OneHotEncoder(handle_unknown='ignore')
+        x = enc.fit_transform(x_test).toarray()
+        complete_x = []
+        complete_y = []
+        counter = 0
+        for item in x:
+            complete_x.append((counter, item))
+            complete_y.append((counter, y_test[counter]))
+            counter = counter + 1
+        all_features = enc.get_feature_names()
+        loss = mean_squared_error(y_test, y_pred)
+        devs = (y_pred - y_test) ** 2
+        errors = []
+        counter = 0
+        for pred in devs:
+            errors.append((counter, pred))
+            counter = counter + 1
+        k = 5
+        # Maximized size significance
+        w = 1
+        alpha = 4
+        top_k = Topk(k)
+        debug = True
+        b_update = True
+        first_level = slicer.make_first_level(all_features, list(complete_x), loss, len(complete_x), y_test, errors,
+                                                  self.loss_type, top_k, alpha, w)
+        first_level_nodes = first_level[0]
+        slice_member = first_level_nodes[(7, 'x2_2')]
+        self.assertGreater(slice_member.score, self.slice_member.score)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/scripts/staging/slicing/utils/model_generator.py b/scripts/staging/slicing/utils/model_generator.py
new file mode 100644
index 0000000..8f80b74
--- /dev/null
+++ b/scripts/staging/slicing/utils/model_generator.py
@@ -0,0 +1,77 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import pandas as pd
+from sklearn import linear_model
+from sklearn.datasets import make_regression
+import numpy as np
+from sklearn.metrics import mean_squared_error, r2_score
+
+
+def generate_dataset(beta, n):
+    # Generate x as an array of `n` samples which can take a value between 0 and 5
+    x = np.random.random_integers(1, 3, (n, 3))
+    # Calculate `y` according to the equation discussed
+    coeff = [2, 3, 1]
+    coeff_mat = np.array(coeff).reshape(len(coeff), 1)
+    y = np.matmul(x, coeff_mat)
+    dataset = np.append(x, y, axis=1)
+    np.savetxt("toy_train.csv", dataset, delimiter=",", fmt='%s')
+    return x, y
+
+
+if __name__ == '__main__':
+    def main():
+        x, y = generate_dataset(10, 100)
+        x_train = x
+        y_train = y
+        dataset = np.append(x, y, axis=1)
+        np.savetxt("toy_train.csv", dataset, delimiter=",", fmt='%s')
+        model = linear_model.LinearRegression()
+
+        # Train the model using the training data that we created
+        model.fit(x_train, y_train)
+        # Now that we have trained the model, we can print the coefficient of x that it has predicted
+        print('Coefficients: \n', model.coef_)
+
+        # We then use the model to make predictions based on the test values of x
+        test_dataset = pd.read_csv("../datasets/toy.csv")
+        attributes_amount = len(test_dataset.values[0])
+        y_test = test_dataset.iloc[:, attributes_amount - 1:attributes_amount].values
+        x_test = test_dataset.iloc[:, 0:attributes_amount - 1].values
+        y_pred = model.predict(x_test)
+
+        # Now, we can calculate the models accuracy metrics based on what the actual value of y was
+        print("Mean squared error: %.2f"
+              % mean_squared_error(y_test, y_pred))
+        print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
+        # Coefficients: [[2. 3. 1.]]
+        # Mean squared error: 3.00
+        # r_2 statistic: 0.72
+
+        # dataset = np.append(x, y, axis=1)
+        # np.savetxt("toy.csv", dataset, delimiter=",", fmt='%s')
+
+        return model, x_test, y_test, y_pred
+
+
+
+