You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2020/08/28 20:53:17 UTC
[systemds] branch master updated: [SYSTEMDS-2641] Updated slice
finding algorithms and tests
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new c8a96b0 [SYSTEMDS-2641] Updated slice finding algorithms and tests
c8a96b0 is described below
commit c8a96b079e55f0cc7cd80c8827b2225336243307
Author: gilgenbergg <la...@gmail.com>
AuthorDate: Fri Aug 28 22:50:33 2020 +0200
[SYSTEMDS-2641] Updated slice finding algorithms and tests
Closes #1041.
---
scripts/staging/slicing/base/SparkNode.py | 8 +-
scripts/staging/slicing/base/node.py | 18 +-
scripts/staging/slicing/base/slicer.py | 63 ++-
scripts/staging/slicing/base/top_k.py | 4 +-
scripts/staging/slicing/base/union_slicer.py | 32 +-
.../slicing/spark_modules/join_data_parallel.py | 17 +-
.../staging/slicing/spark_modules/spark_slicer.py | 10 +-
.../slicing/spark_modules/spark_union_slicer.py | 10 +-
.../staging/slicing/spark_modules/spark_utils.py | 138 -------
.../slicing/spark_modules/union_data_parallel.py | 11 +-
.../compas_bd.py} | 81 ++--
.../slicing/tests/classification/test_adult.py | 9 +-
.../{test_adult.py => test_compas.py} | 64 ++-
.../slicing/tests/regression/bd_spark_salary.py | 15 +-
.../slicing/tests/regression/spark_salary.py | 56 ++-
.../staging/slicing/tests/regression/toy_tests.py | 428 +++++++++++++++++++++
scripts/staging/slicing/utils/model_generator.py | 77 ++++
17 files changed, 742 insertions(+), 299 deletions(-)
diff --git a/scripts/staging/slicing/base/SparkNode.py b/scripts/staging/slicing/base/SparkNode.py
index a123624..81bfcb9 100644
--- a/scripts/staging/slicing/base/SparkNode.py
+++ b/scripts/staging/slicing/base/SparkNode.py
@@ -46,6 +46,7 @@ class SparkNode:
self.size = 0
self.score = 0
self.loss = 0
+ self.s_upper = 0
self.s_lower = 1
self.key = ''
@@ -61,8 +62,6 @@ class SparkNode:
def process_slice(self, loss_type):
mask = self.make_slice_mask()
- print("mask")
- print(mask)
if loss_type == 0:
self.calc_l2(mask)
elif loss_type == 1:
@@ -82,7 +81,6 @@ class SparkNode:
def calc_l2(self, mask):
max_tuple_error = 0
sum_error = 0
- size = 0
filtered = self.filter_by_mask(mask)
self.size = len(filtered)
for row in filtered:
@@ -150,8 +148,8 @@ class SparkNode:
def make_key(self):
return self.name
- def check_constraint(self, top_k, x_size, alpha):
- return self.score >= top_k.min_score and self.size >= x_size / alpha
+ def check_constraint(self, top_k, x_size, alpha, cur_min):
+ return self.score >= cur_min and self.size >= x_size / alpha
def check_bounds(self, top_k, x_size, alpha):
return self.s_upper >= x_size / alpha and self.c_upper >= top_k.min_score
diff --git a/scripts/staging/slicing/base/node.py b/scripts/staging/slicing/base/node.py
index 4d80651..f7b553b 100644
--- a/scripts/staging/slicing/base/node.py
+++ b/scripts/staging/slicing/base/node.py
@@ -108,11 +108,13 @@ class Node:
return list(filter(lambda row: all(row[1][attr] == 1 for attr in mask), self.complete_x))
def calc_s_upper(self, cur_lvl):
- cur_min = self.parents[0].size
- for parent in self.parents:
- if cur_lvl == 1:
+ if cur_lvl == 1:
+ cur_min = self.parents[0].size
+ for parent in self.parents:
cur_min = min(cur_min, parent.size)
- else:
+ else:
+ cur_min = self.parents[0].s_upper
+ for parent in self.parents:
cur_min = min(cur_min, parent.s_upper)
return cur_min
@@ -157,11 +159,11 @@ class Node:
def make_key(self, new_id):
return new_id, self.name
- def check_constraint(self, top_k, x_size, alpha):
- return self.score >= top_k.min_score and self.size >= x_size / alpha
+ def check_constraint(self, top_k, x_size, alpha, cur_min):
+ return self.score >= cur_min and self.size >= x_size / alpha
- def check_bounds(self, top_k, x_size, alpha):
- return self.s_upper >= x_size / alpha and self.c_upper >= top_k.min_score
+ def check_bounds(self, x_size, alpha, cur_min):
+ return self.s_upper >= x_size / alpha and self.c_upper >= cur_min
def update_bounds(self, s_upper, s_lower, e_upper, e_max_upper, w):
if self.s_upper:
diff --git a/scripts/staging/slicing/base/slicer.py b/scripts/staging/slicing/base/slicer.py
index a3c4a48..77851b6 100644
--- a/scripts/staging/slicing/base/slicer.py
+++ b/scripts/staging/slicing/base/slicer.py
@@ -21,6 +21,7 @@
from slicing.base.node import Node
from slicing.base.top_k import Topk
+import matplotlib.pyplot as plt
# optimization function calculation:
@@ -39,9 +40,11 @@ def opt_fun(fi, si, f, x_size, w):
# invalid combination example: node ABC + CDE (on 4th level) // result node - ABCDE (absurd for 4th level)
def slice_name_nonsense(node_i, node_j, cur_lvl):
attr1 = list(map(lambda x: x[0].split("_")[0], node_i.attributes))
+ key1 = node_i.key[0]
attr2 = list(map(lambda x: x[0].split("_")[0], node_j.attributes))
+ key2 = node_j.key[0]
commons = len(list(set(attr1) & set(attr2)))
- return commons == cur_lvl - 1
+ return commons == cur_lvl - 1 and key1 < key2
def union(lst1, lst2):
@@ -50,7 +53,7 @@ def union(lst1, lst2):
def make_first_level(all_features, complete_x, loss, x_size, y_test, errors, loss_type, top_k, alpha, w):
- first_level = []
+ first_level = {}
counter = 0
all_nodes = {}
# First level slices are enumerated in a "classic way" (getting data and not analyzing bounds
@@ -65,10 +68,10 @@ def make_first_level(all_features, complete_x, loss, x_size, y_test, errors, los
new_node.process_slice(loss_type)
new_node.score = opt_fun(new_node.loss, new_node.size, loss, x_size, w)
new_node.c_upper = new_node.score
- first_level.append(new_node)
+ first_level[new_node.key] = new_node
new_node.print_debug(top_k, 0)
# constraints for 1st level nodes to be problematic candidates
- if new_node.check_constraint(top_k, x_size, alpha):
+ if new_node.check_constraint(top_k, x_size, alpha, 1):
# this method updates top k slices if needed
top_k.add_new_top_slice(new_node)
counter = counter + 1
@@ -77,9 +80,10 @@ def make_first_level(all_features, complete_x, loss, x_size, y_test, errors, los
def join_enum(node_i, prev_lvl, complete_x, loss, x_size, y_test, errors, debug, alpha, w, loss_type, b_update, cur_lvl,
all_nodes, top_k, cur_lvl_nodes):
- for node_j in range(len(prev_lvl)):
+ for node_j in prev_lvl:
flag = slice_name_nonsense(prev_lvl[node_i], prev_lvl[node_j], cur_lvl)
- if flag and prev_lvl[node_j].key[0] > prev_lvl[node_i].key[0]:
+ required_number = len(union(prev_lvl[node_i].attributes, prev_lvl[node_j].attributes))
+ if flag and required_number == cur_lvl + 1:
new_node = Node(complete_x, loss, x_size, y_test, errors)
parents_set = set(new_node.parents)
parents_set.add(prev_lvl[node_i])
@@ -92,8 +96,8 @@ def join_enum(node_i, prev_lvl, complete_x, loss, x_size, y_test, errors, debug,
new_node.name = new_node.make_name()
new_id = len(all_nodes)
new_node.key = new_node.make_key(new_id)
- if new_node.key[1] in all_nodes:
- existing_item = all_nodes[new_node.key[1]]
+ if new_node.key[1] in cur_lvl_nodes:
+ existing_item = cur_lvl_nodes[new_node.key[1]]
parents_set = set(existing_item.parents)
existing_item.parents = parents_set
if b_update:
@@ -104,18 +108,18 @@ def join_enum(node_i, prev_lvl, complete_x, loss, x_size, y_test, errors, debug,
new_node.update_bounds(s_upper, s_lower, e_upper, e_max_upper, w)
else:
new_node.calc_bounds(cur_lvl, w)
- all_nodes[new_node.key[1]] = new_node
+ all_nodes[new_node.key[0]] = new_node
# check if concrete data should be extracted or not (only for those that have score upper
# big enough and if size of subset is big enough
- to_slice = new_node.check_bounds(top_k, x_size, alpha)
+ to_slice = new_node.check_bounds(x_size, alpha, top_k.min_score)
if to_slice:
new_node.process_slice(loss_type)
new_node.score = opt_fun(new_node.loss, new_node.size, loss, x_size, w)
# we decide to add node to current level nodes (in order to make new combinations
# on the next one or not basing on its score value
- if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys:
+ if new_node.check_constraint(top_k, x_size, alpha, top_k.min_score) and new_node.key not in top_k.keys:
top_k.add_new_top_slice(new_node)
- cur_lvl_nodes.append(new_node)
+ cur_lvl_nodes[new_node.key[1]] = new_node
if debug:
new_node.print_debug(top_k, cur_lvl)
return cur_lvl_nodes, all_nodes
@@ -131,30 +135,51 @@ def process(all_features, complete_x, loss, x_size, y_test, errors, debug, alpha
levels = []
top_k = Topk(k)
first_level = make_first_level(all_features, complete_x, loss, x_size, y_test, errors, loss_type, top_k, alpha, w)
+ candidates = []
+ pruned = []
+ indexes = []
+ indexes.append(1)
+ candidates.append(len(first_level[0]))
+ pruned.append(len(first_level[0]))
all_nodes = first_level[1]
levels.append(first_level[0])
# cur_lvl - index of current level, correlates with number of slice forming features
cur_lvl = 1 # currently filled level after first init iteration
- # currently for debug
- print("Level 1 had " + str(len(all_features)) + " candidates")
print()
print("Current topk are: ")
top_k.print_topk()
# combining each candidate of previous level with every till it becomes useless (one node can't make a pair)
while len(levels[cur_lvl - 1]) > 1:
- cur_lvl_nodes = []
+ cur_lvl_nodes = {}
prev_lvl = levels[cur_lvl - 1]
- for node_i in range(len(prev_lvl)):
+ level_candidates = len(prev_lvl) * (len(prev_lvl) - 1)
+ candidates.append(level_candidates)
+ for node_i in prev_lvl:
partial = join_enum(node_i, prev_lvl, complete_x, loss, x_size, y_test, errors, debug, alpha, w, loss_type,
b_update, cur_lvl, all_nodes, top_k, cur_lvl_nodes)
cur_lvl_nodes = partial[0]
all_nodes = partial[1]
cur_lvl = cur_lvl + 1
+ indexes.append(cur_lvl)
levels.append(cur_lvl_nodes)
- top_k.print_topk()
- print("Level " + str(cur_lvl) + " had " + str(len(prev_lvl) * (len(prev_lvl) - 1)) +
+ print("Level " + str(cur_lvl) + " had " + str(candidates) +
" candidates but after pruning only " + str(len(cur_lvl_nodes)) + " go to the next level")
- print("Program stopped at level " + str(cur_lvl + 1))
+ pruned.append(len(cur_lvl_nodes))
+ print()
+ print("Current topk are: ")
+ top_k.print_topk()
+ plt.plot(indexes, candidates, 'r--',
+ indexes, pruned, 'g--')
+ plt.xlabel('Level')
+ plt.ylabel('Number of slices')
+ plt.show()
+ print("Program stopped at level " + str(cur_lvl))
print()
print("Selected slices are: ")
top_k.print_topk()
+ print("candidates:")
+ print(candidates)
+ print(">>>>>>>>>")
+ print("pruned:")
+ print(pruned)
+ return top_k
diff --git a/scripts/staging/slicing/base/top_k.py b/scripts/staging/slicing/base/top_k.py
index 3957fea..38c6545 100644
--- a/scripts/staging/slicing/base/top_k.py
+++ b/scripts/staging/slicing/base/top_k.py
@@ -51,8 +51,8 @@ class Topk:
for candidate in self.slices:
print(candidate.name + ": " + "score = " + str(candidate.score) + "; size = " + str(candidate.size))
- def buckets_top_k(self, cur_lvl_slices, x_size, alpha):
+ def buckets_top_k(self, cur_lvl_slices, x_size, alpha, cur_min):
for bucket in cur_lvl_slices:
- if bucket.check_constraint(self, x_size, alpha):
+ if bucket.size > x_size / alpha and bucket.score >= cur_min:
self.add_new_top_slice(bucket)
return self
diff --git a/scripts/staging/slicing/base/union_slicer.py b/scripts/staging/slicing/base/union_slicer.py
index 78b7d2b..8ec64d5 100644
--- a/scripts/staging/slicing/base/union_slicer.py
+++ b/scripts/staging/slicing/base/union_slicer.py
@@ -22,6 +22,7 @@
from slicing.base.node import Node
from slicing.base.top_k import Topk
from slicing.base.slicer import opt_fun, union
+import matplotlib.pyplot as plt
def check_attributes(left_node, right_node):
@@ -69,6 +70,12 @@ def process(all_features, complete_x, loss, x_size, y_test, errors, debug, alpha
first_level = make_first_level(all_features, complete_x, loss, x_size, y_test, errors, loss_type, w, alpha, top_k)
# double appending of first level nodes in order to enumerating second level in the same way as others
levels.append((first_level[0], len(all_features)))
+ candidates = []
+ pruned = []
+ indexes = []
+ indexes.append(1)
+ candidates.append(len(first_level[0]))
+ pruned.append(len(first_level[0]))
all_nodes = first_level[1]
# cur_lvl - index of current level, correlates with number of slice forming features
cur_lvl = 1 # level that is planned to be filled later
@@ -82,12 +89,14 @@ def process(all_features, complete_x, loss, x_size, y_test, errors, debug, alpha
while len(cur_lvl_nodes) > 0:
cur_lvl_nodes = []
count = 0
+ prev_lvl = levels[cur_lvl - 1]
for left in range(int(cur_lvl / 2) + 1):
right = cur_lvl - 1 - left
for node_i in range(len(levels[left][0])):
for node_j in range(len(levels[right][0])):
flag = check_attributes(levels[left][0][node_i], levels[right][0][node_j])
- if flag:
+ required_number = len(union(levels[left][0][node_i].attributes, levels[right][0][node_j].attributes))
+ if flag and required_number == cur_lvl + 1:
new_node = Node(complete_x, loss, x_size, y_test, errors)
parents_set = set(new_node.parents)
parents_set.add(levels[left][0][node_i])
@@ -115,24 +124,39 @@ def process(all_features, complete_x, loss, x_size, y_test, errors, debug, alpha
all_nodes[new_node.key[1]] = new_node
# check if concrete data should be extracted or not (only for those that have score upper
# big enough and if size of subset is big enough
- to_slice = new_node.check_bounds(top_k, x_size, alpha)
+ to_slice = new_node.check_bounds(x_size, alpha, top_k.min_score)
if to_slice:
new_node.process_slice(loss_type)
new_node.score = opt_fun(new_node.loss, new_node.size, loss, x_size, w)
# we decide to add node to current level nodes (in order to make new combinations
# on the next one or not basing on its score value
- if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys:
+ if new_node.check_constraint(top_k, x_size, alpha, top_k.min_score) and new_node.key \
+ not in top_k.keys:
top_k.add_new_top_slice(new_node)
cur_lvl_nodes.append(new_node)
if debug:
new_node.print_debug(top_k, cur_lvl)
count = count + levels[left][1] * levels[right][1]
+ cur_lvl = cur_lvl + 1
+ indexes.append(cur_lvl)
print("Level " + str(cur_lvl) + " had " + str(count) +
" candidates but after pruning only " + str(len(cur_lvl_nodes)) + " go to the next level")
- cur_lvl = cur_lvl + 1
levels.append((cur_lvl_nodes, count))
+ candidates.append(count)
+ pruned.append(len(cur_lvl_nodes))
top_k.print_topk()
+ plt.plot(indexes, candidates, 'r--',
+ indexes, pruned, 'g--')
+ plt.xlabel('Level')
+ plt.ylabel('Number of slices')
+ plt.show()
print("Program stopped at level " + str(cur_lvl))
print()
print("Selected slices are: ")
top_k.print_topk()
+ print("candidates:")
+ print(candidates)
+ print(">>>>>>>>>")
+ print("pruned:")
+ print(pruned)
+ return top_k
diff --git a/scripts/staging/slicing/spark_modules/join_data_parallel.py b/scripts/staging/slicing/spark_modules/join_data_parallel.py
index 4f2ff0e..f55b0ca 100644
--- a/scripts/staging/slicing/spark_modules/join_data_parallel.py
+++ b/scripts/staging/slicing/spark_modules/join_data_parallel.py
@@ -29,7 +29,8 @@ from slicing.spark_modules.spark_utils import approved_join_slice
def rows_mapper(row, buckets, loss_type):
- filtered = dict(filter(lambda bucket: all(attr in row[1] for attr in bucket[1].attributes), buckets.items()))
+ # filtered = dict(filter(lambda bucket: all(attr in row[1] for attr in bucket[1].attributes), buckets.items()))
+ filtered = dict(filter(lambda bucket: all(attr in row[0] for attr in bucket[1].attributes), buckets.items()))
for item in filtered:
filtered[item].update_metrics(row, loss_type)
return filtered
@@ -79,8 +80,8 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
bucket = Bucket(node, cur_lvl, w, x_size, loss)
buckets[bucket.name] = bucket
b_buckets = SparkContext.broadcast(sc, buckets)
- rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2]))\
- .map(lambda item: (item[0], item[1].tolist(), item[2]))
+ rows = predictions.rdd.map(lambda row: (row[1].indices, row[2]))\
+ .map(lambda item: list(item))
mapped = rows.map(lambda row: rows_mapper(row, b_buckets.value, loss_type))
flattened = mapped.flatMap(lambda line: (line.items()))
reduced = flattened.combineByKey(combiner, merge_values, merge_combiners)
@@ -90,10 +91,11 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
cur_lvl_nodes.map(lambda bucket: bucket.print_debug(b_topk.value)).collect()
cur_lvl = 1
prev_level = cur_lvl_nodes.collect()
- top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+ top_k = top_k.buckets_top_k(prev_level, x_size, alpha, 1)
while len(prev_level) > 0:
b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
b_topk = SparkContext.broadcast(sc, top_k)
+ cur_min = top_k.min_score
b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
top_k.print_topk()
buckets = join_enum(prev_level, cur_lvl, x_size, alpha, top_k, w, loss)
@@ -109,12 +111,13 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
.map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1], loss, w, x_size, b_cur_lvl.value))\
.collect()
cur_lvl += 1
- top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+ top_k = top_k.buckets_top_k(prev_level, x_size, alpha, cur_min)
print("Level " + str(cur_lvl) + " had " + str(
- len(b_cur_lvl_nodes.value * (len(b_cur_lvl_nodes.value) - 1)))+" candidates but after pruning only " +
+ len(b_cur_lvl_nodes.value * (len(prev_level) - 1)))+" candidates but after pruning only " +
str(len(prev_level)) + " go to the next level")
- print("Program stopped at level " + str(cur_lvl))
+ top_k.print_topk()
print()
+ print("Program stopped at level " + str(cur_lvl - 1))
print("Selected slices are: ")
top_k.print_topk()
return None
diff --git a/scripts/staging/slicing/spark_modules/spark_slicer.py b/scripts/staging/slicing/spark_modules/spark_slicer.py
index 83de579..9da5b98 100644
--- a/scripts/staging/slicing/spark_modules/spark_slicer.py
+++ b/scripts/staging/slicing/spark_modules/spark_slicer.py
@@ -73,7 +73,7 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
b_topk.value, w, loss_type)) \
.map(lambda node: (node.key, node)).collect()
first_level.update(init_slices)
- update_top_k(first_level, top_k, alpha, predictions)
+ update_top_k(first_level, top_k, alpha, predictions, 1)
prev_level = SparkContext.broadcast(sc, first_level)
levels.append(prev_level)
cur_lvl = cur_lvl + 1
@@ -82,20 +82,22 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
while len(levels[cur_lvl - 1].value) > 0:
nodes_list = {}
b_topk = SparkContext.broadcast(sc, top_k)
+ cur_min = top_k.min_score
partitions = sc.parallelize(levels[cur_lvl - 1].value.values())
mapped = partitions.mapPartitions(lambda nodes: spark_utils.nodes_enum(nodes, levels[cur_lvl - 1].value.values(),
predictions, loss, b_topk.value, alpha, k, w,
- loss_type, cur_lvl, debug, enumerator))
+ loss_type, cur_lvl, debug, enumerator, cur_min))
flattened = mapped.flatMap(lambda node: node)
nodes_list.update(flattened.map(lambda node: (node.key, node)).distinct().collect())
prev_level = SparkContext.broadcast(sc, nodes_list)
levels.append(prev_level)
- update_top_k(nodes_list, top_k, alpha, predictions)
+ update_top_k(nodes_list, top_k, alpha, predictions, cur_min)
cur_lvl = cur_lvl + 1
b_topk.value.print_topk()
print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl - 1].value) * (len(levels[cur_lvl - 1].value) - 1)) +
" candidates but after pruning only " + str(len(nodes_list)) + " go to the next level")
- print("Program stopped at level " + str(cur_lvl))
+ print("Program stopped at level " + str(cur_lvl - 1))
print()
print("Selected slices are: ")
top_k.print_topk()
+ return top_k
diff --git a/scripts/staging/slicing/spark_modules/spark_union_slicer.py b/scripts/staging/slicing/spark_modules/spark_union_slicer.py
index 02ee716..94172de 100644
--- a/scripts/staging/slicing/spark_modules/spark_union_slicer.py
+++ b/scripts/staging/slicing/spark_modules/spark_union_slicer.py
@@ -40,7 +40,7 @@ def process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type,
.map(lambda node: (node.key, node)) \
.collect()
first_level.update(init_slices)
- update_top_k(first_level, top_k, alpha, predictions)
+ update_top_k(first_level, top_k, alpha, predictions, 1)
prev_level = SparkContext.broadcast(sc, first_level)
levels.append(prev_level)
cur_lvl = 1
@@ -48,24 +48,26 @@ def process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type,
while len(levels[cur_lvl - 1].value) > 0:
cur_lvl_res = {}
b_topk = SparkContext.broadcast(sc, top_k)
+ cur_min = top_k.min_score
for left in range(int(cur_lvl / 2) + 1):
right = cur_lvl - left - 1
partitions = sc.parallelize(levels[left].value.values())
mapped = partitions.mapPartitions(lambda nodes: spark_utils.nodes_enum(nodes, levels[right].value.values(),
predictions, loss, b_topk.value, alpha, k,
w, loss_type, cur_lvl, debug,
- enumerator))
+ enumerator, cur_min))
flattened = mapped.flatMap(lambda node: node)
partial = flattened.map(lambda node: (node.key, node)).collect()
cur_lvl_res.update(partial)
prev_level = SparkContext.broadcast(sc, cur_lvl_res)
levels.append(prev_level)
- update_top_k(cur_lvl_res, top_k, alpha, predictions)
+ update_top_k(cur_lvl_res, top_k, alpha, predictions, cur_min)
cur_lvl = cur_lvl + 1
top_k.print_topk()
print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl - 1].value) * (len(levels[cur_lvl - 1].value) - 1)) +
" candidates but after pruning only " + str(len(prev_level.value)) + " go to the next level")
- print("Program stopped at level " + str(cur_lvl))
+ print("Program stopped at level " + str(cur_lvl - 1))
print()
print("Selected slices are: ")
top_k.print_topk()
+ return top_k
diff --git a/scripts/staging/slicing/spark_modules/spark_utils.py b/scripts/staging/slicing/spark_modules/spark_utils.py
deleted file mode 100644
index cb83c71..0000000
--- a/scripts/staging/slicing/spark_modules/spark_utils.py
+++ /dev/null
@@ -1,138 +0,0 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-
-from pyspark.sql.functions import udf
-from pyspark.sql.types import FloatType
-
-from slicing.base.SparkNode import SparkNode
-from slicing.base.slicer import opt_fun, union
-
-calc_loss = udf(lambda target, prediction, type: calc_loss_fun(target, prediction, type), FloatType())
-model_type_init = udf(lambda type: init_model_type(type))
-
-
-def calc_loss_fun(target, prediction, type):
- if type == 0:
- return (prediction - target) ** 2
- elif type == 1:
- return float(target == prediction)
-
-
-def init_model_type(model_type):
- if model_type == "regression":
- return 0
- elif model_type == "classification":
- return 1
-
-
-def approved_join_slice(node_i, node_j, cur_lvl):
- commons = len(list(set(node_i.attributes) & set(node_j.attributes)))
- return commons == cur_lvl - 1
-
-
-def approved_union_slice(node_i, node_j):
- if set(node_i.attributes).intersection(set(node_j.attributes)):
- return False
- return True
-
-
-def make_first_level(features, predictions, loss, top_k, w, loss_type):
- first_level = []
- # First level slices are enumerated in a "classic way" (getting data and not analyzing bounds
- for feature in features:
- new_node = SparkNode(loss, predictions)
- new_node.parents = [feature]
- new_node.attributes.append(feature)
- new_node.name = new_node.make_name()
- new_node.key = new_node.make_key()
- new_node.process_slice(loss_type)
- new_node.score = opt_fun(new_node.loss, new_node.size, loss, len(predictions), w)
- new_node.c_upper = new_node.score
- first_level.append(new_node)
- new_node.print_debug(top_k, 0)
- return first_level
-
-
-def process_node(node_i, level, loss, predictions, cur_lvl, top_k, alpha, loss_type, w, debug, enumerator):
- cur_enum_nodes = []
- for node_j in level:
- if enumerator == "join":
- flag = approved_join_slice(node_i, node_j, cur_lvl)
- else:
- flag = approved_union_slice(node_i, node_j)
- if flag and int(node_i.name.split("&&")[0]) < int(node_j.name.split("&&")[0]):
- new_node = SparkNode(loss, predictions)
- parents_set = set(new_node.parents)
- parents_set.add(node_i)
- parents_set.add(node_j)
- new_node.parents = list(parents_set)
- parent1_attr = node_i.attributes
- parent2_attr = node_j.attributes
- new_node_attr = union(parent1_attr, parent2_attr)
- new_node.attributes = new_node_attr
- new_node.name = new_node.make_name()
- new_node.key = new_node.make_key()
- new_node.calc_bounds(cur_lvl, w)
- to_slice = new_node.check_bounds(top_k, len(predictions), alpha)
- if to_slice:
- new_node.process_slice(loss_type)
- new_node.score = opt_fun(new_node.loss, new_node.size, loss, len(predictions), w)
- if new_node.check_constraint(top_k, len(predictions), alpha):
- cur_enum_nodes.append(new_node)
- if debug:
- new_node.print_debug(top_k, cur_lvl)
- return cur_enum_nodes
-
-
-def nodes_enum(nodes, level, predictions, loss, top_k, alpha, k, w, loss_type, cur_lvl, debug, enumerator):
- cur_enum_nodes = []
- for node_i in nodes:
- partial_nodes = process_node(node_i, level, loss, predictions, cur_lvl, top_k, alpha,
- loss_type, w, debug, enumerator)
- cur_enum_nodes.append(partial_nodes)
- return cur_enum_nodes
-
-
-def init_top_k(first_level, top_k, alpha, predictions):
- # driver updates topK
- for sliced in first_level:
- if sliced[1].check_constraint(top_k, len(predictions), alpha):
- # this method updates top k slices if needed
- top_k.add_new_top_slice(sliced[1])
-
-
-def update_top_k(new_slices, top_k, alpha, predictions):
- # driver updates topK
- for sliced in new_slices.values():
- if sliced.check_constraint(top_k, len(predictions), alpha):
- # this method updates top k slices if needed
- top_k.add_new_top_slice(sliced)
-
-
-def calc_bucket_metrics(bucket, loss, w, x_size, cur_lvl):
- bucket.calc_error()
- bucket.score = opt_fun(bucket.error, bucket.size, loss, x_size, w)
- if cur_lvl == 0:
- bucket.s_upper = bucket.size
- bucket.c_upper = bucket.score
- bucket.s_lower = 1
- return bucket
diff --git a/scripts/staging/slicing/spark_modules/union_data_parallel.py b/scripts/staging/slicing/spark_modules/union_data_parallel.py
index dcd1af3..5ea70cc 100644
--- a/scripts/staging/slicing/spark_modules/union_data_parallel.py
+++ b/scripts/staging/slicing/spark_modules/union_data_parallel.py
@@ -70,8 +70,10 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
bucket = Bucket(node, cur_lvl, w, x_size, loss)
buckets[bucket.name] = bucket
b_buckets = SparkContext.broadcast(sc, buckets)
- rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2])) \
- .map(lambda item: (item[0], item[1].tolist(), item[2]))
+ # rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2])) \
+ # .map(lambda item: (item[0], item[1].tolist(), item[2]))
+ rows = predictions.rdd.map(lambda row: row[1].indices) \
+ .map(lambda item: list(item))
mapped = rows.map(lambda row: rows_mapper(row, b_buckets.value, loss_type))
flattened = mapped.flatMap(lambda line: (line.items()))
reduced = flattened.combineByKey(combiner, join_data_parallel.merge_values, join_data_parallel.merge_combiners)
@@ -83,9 +85,10 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
prev_level = cur_lvl_nodes.collect()
b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
levels.append(b_cur_lvl_nodes)
- top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+ top_k = top_k.buckets_top_k(prev_level, x_size, alpha, 1)
while len(prev_level) > 0:
b_topk = SparkContext.broadcast(sc, top_k)
+ cur_min = top_k.min_score
b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
top_k.print_topk()
buckets = []
@@ -104,7 +107,7 @@ def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, lo
partial = flattened.combineByKey(combiner, join_data_parallel.merge_values, join_data_parallel.merge_combiners)
prev_level = partial\
.map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1], loss, w, x_size, b_cur_lvl.value)).collect()
- top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+ top_k = top_k.buckets_top_k(prev_level, x_size, alpha, cur_min)
b_topk = SparkContext.broadcast(sc, top_k)
if debug:
partial.values().map(lambda bucket: bucket.print_debug(b_topk.value)).collect()
diff --git a/scripts/staging/slicing/tests/regression/bd_spark_salary.py b/scripts/staging/slicing/tests/classification/compas_bd.py
similarity index 64%
copy from scripts/staging/slicing/tests/regression/bd_spark_salary.py
copy to scripts/staging/slicing/tests/classification/compas_bd.py
index c32414b..3f1a5b3 100644
--- a/scripts/staging/slicing/tests/regression/bd_spark_salary.py
+++ b/scripts/staging/slicing/tests/classification/compas_bd.py
@@ -23,6 +23,8 @@ import sys
from pyspark import SparkConf, SparkContext
from pyspark.ml import Pipeline
+from pyspark.ml.classification import RandomForestClassifier
+from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, IndexToString
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SQLContext
@@ -32,8 +34,6 @@ from sklearn.model_selection import train_test_split
from slicing.spark_modules import spark_utils, join_data_parallel, union_data_parallel
-binner = udf(lambda arg: int(int(arg) // 5))
-
if __name__ == "__main__":
args = sys.argv
@@ -48,43 +48,41 @@ if __name__ == "__main__":
debug = args[5]
loss_type = int(args[6])
enumerator = args[7]
+ dataset = args[8]
else:
k = 10
- w = 0.5
- alpha = 6
+ w = 0.1
+ alpha = 4
b_update = True
debug = True
loss_type = 0
+ dataset = 'compas-scores-two-years.csv'
enumerator = "union"
- conf = SparkConf().setAppName("salary_test").setMaster('local[2]')
- num_partitions = 2
- model_type = "regression"
- label = 'salary'
+ conf = SparkConf().setAppName("salary_test").setMaster('local[8]')
+ num_partitions = 8
+ model_type = "classification"
+ label = "is_recid"
sparkContext = SparkContext(conf=conf)
sqlContext = SQLContext(sparkContext)
- fileRDD = sparkContext.textFile('salaries.csv', num_partitions)
- header = fileRDD.first()
- head_split = header.split(",")
- head_split[0] = '_c0'
- fileRDD = fileRDD.filter(lambda line: line != header)
- data = fileRDD.map(lambda row: row.split(","))
- dataset_df = sqlContext.createDataFrame(data, head_split)
+ dataset_df = sqlContext.read\
+ .option("badRecordsPath", "/tmp/badRecordsPath")\
+ .csv(dataset, header='true',
+ inferSchema='true')
+ # fileRDD = sparkContext.textFile(dataset, num_partitions)
+ # header = fileRDD.first()
+ # head_split = header.split(",")
+ # fileRDD = fileRDD.filter(lambda line: line != header)
+ # data = fileRDD.map(lambda row: row.split(","))
+ # dataset_df = sqlContext.createDataFrame(data, head_split)
- cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"]
- # initializing stages of main transformation pipeline
+ cat_features = ["sex", "age_cat", "race", "decile_score10", "c_charge_degree", "c_charge_desc"]
stages = []
- dataset_df = dataset_df.drop('_c0')
dataset_df = dataset_df.withColumn("id", sf.monotonically_increasing_id())
- # bining numeric features by local binner udf function (specified for current dataset if needed)
- dataset_df = dataset_df.withColumn('sincephd_bin', binner(dataset_df['sincephd']))
- dataset_df = dataset_df.withColumn('service_bin', binner(dataset_df['service']))
- dataset_df = dataset_df.withColumn('model_type', sf.lit(0))
- dataset_df = dataset_df.drop('sincephd', 'service')
dataset_df = dataset_df.withColumn('target', dataset_df[label].cast("int"))
- # hot encoding categorical features
+ dataset_df = dataset_df.withColumn('model_type', sf.lit(1))
for feature in cat_features:
- string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index")
+ string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index").setHandleInvalid("skip")
encoder = OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], outputCols=[feature + "_vec"])
encoder.setDropLast(False)
stages += [string_indexer, encoder]
@@ -113,19 +111,26 @@ if __name__ == "__main__":
train, test = train_test_split(df_transform_fin, test_size=0.3, random_state=0)
train_df = sqlContext.createDataFrame(train)
test_df = sqlContext.createDataFrame(test)
- lr = LinearRegression(featuresCol='features', labelCol='target', maxIter=10, regParam=0.3, elasticNetParam=0.8)
- lr_model = lr.fit(train_df)
- eval = lr_model.evaluate(test_df)
- f_l2 = eval.meanSquaredError
- pred = eval.predictions
- pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred['target'], pred['prediction'], pred['model_type']))
+
+ rf = RandomForestClassifier(featuresCol='features', labelCol="target", numTrees=10)
+ rf_model = rf.fit(train_df)
+ predictions = rf_model.transform(test_df)
+ # Select example rows to display.
+ predictions.select("id", "features", "target", "prediction", "model_type")
+ # Select (prediction, true label) and compute test error
+ evaluator = MulticlassClassificationEvaluator(
+ labelCol="target", predictionCol="prediction", metricName="accuracy")
+ accuracy = evaluator.evaluate(predictions)
+ loss = 1.0 - accuracy
+ pred_df_fin = predictions.withColumn('error',
+ spark_utils.calc_loss(predictions["target"], predictions['prediction'],
+ predictions['model_type']))
predictions = pred_df_fin.select('id', 'features', 'error').repartition(num_partitions)
- converter = IndexToString(inputCol='features', outputCol='cats')
- all_features = range(predictions.toPandas().values[1][1].size)
- k = 10
+ all_features = range(predictions.toPandas().values[0][0])
+
if enumerator == "join":
- join_data_parallel.parallel_process(all_features, predictions, f_l2, sparkContext, debug=debug, alpha=alpha,
- k=k, w=w, loss_type=loss_type)
+ join_data_parallel.parallel_process(all_features, predictions, loss, sparkContext, debug=debug, alpha=alpha, k=k,
+ w=w, loss_type=loss_type)
elif enumerator == "union":
- union_data_parallel.parallel_process(all_features, predictions, f_l2, sparkContext, debug=debug, alpha=alpha,
- k=k, w=w, loss_type=loss_type)
+ union_data_parallel.parallel_process(all_features, predictions, loss, sparkContext, debug=debug, alpha=alpha, k=k,
+ w=w, loss_type=loss_type)
diff --git a/scripts/staging/slicing/tests/classification/test_adult.py b/scripts/staging/slicing/tests/classification/test_adult.py
index 9575592..cacb715 100644
--- a/scripts/staging/slicing/tests/classification/test_adult.py
+++ b/scripts/staging/slicing/tests/classification/test_adult.py
@@ -23,6 +23,7 @@ import sys
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
+from sklearn import svm
from slicing.base import slicer as slicer, union_slicer
from sklearn.ensemble import RandomForestClassifier
@@ -83,14 +84,8 @@ if __name__ == "__main__":
complete_y.append((counter, y_test[counter]))
counter = counter + 1
x_size = counter
- clf = RandomForestClassifier(n_jobs=2, random_state=0)
+ clf = svm.SVC()
clf.fit(x_train, y_train)
- RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
- max_depth=None, max_features='auto', max_leaf_nodes=None,
- min_impurity_split=1e-07, min_samples_leaf=1,
- min_samples_split=2, min_weight_fraction_leaf=0.0,
- n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
- verbose=0, warm_start=False)
# alpha is size significance coefficient
# verbose option is for returning debug info while creating slices and printing it
diff --git a/scripts/staging/slicing/tests/classification/test_adult.py b/scripts/staging/slicing/tests/classification/test_compas.py
similarity index 73%
copy from scripts/staging/slicing/tests/classification/test_adult.py
copy to scripts/staging/slicing/tests/classification/test_compas.py
index 9575592..7fbb6fc 100644
--- a/scripts/staging/slicing/tests/classification/test_adult.py
+++ b/scripts/staging/slicing/tests/classification/test_compas.py
@@ -19,16 +19,15 @@
#
#-------------------------------------------------------------
-import sys
-
import pandas as pd
-from sklearn.preprocessing import OneHotEncoder
-
-from slicing.base import slicer as slicer, union_slicer
from sklearn.ensemble import RandomForestClassifier
-from sklearn import preprocessing
+from sklearn.linear_model import LinearRegression
+from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
+from sklearn.preprocessing import OneHotEncoder
+import sys
+from slicing.base import slicer, union_slicer
if __name__ == "__main__":
args = sys.argv
@@ -49,35 +48,23 @@ if __name__ == "__main__":
alpha = 4
b_update = True
debug = True
- loss_type = 1
+ loss_type = 0
enumerator = "union"
- dataset = pd.read_csv('/slicing/datasets/adult.csv')
+ file_name = 'slicing/datasets/real/compas/compas-test.csv'
+ dataset = pd.read_csv(file_name)
attributes_amount = len(dataset.values[0])
+ y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values
+ # starting with one not including id field
x = dataset.iloc[:, 0:attributes_amount - 1].values
- y = dataset.iloc[:, attributes_amount - 1]
- le = preprocessing.LabelEncoder()
- le.fit(y)
- y = le.transform(y)
+ # hot encoding of categorical features
+ enc = OneHotEncoder(handle_unknown='ignore')
+ x = enc.fit_transform(x).toarray()
complete_x = []
complete_y = []
counter = 0
- all_indexes = []
- not_encoded_columns = [
- "Age", "WorkClass", "fnlwgt", "Education", "EducationNum",
- "MaritalStatus", "Occupation", "Relationship", "Race", "Gender",
- "CapitalGain", "CapitalLoss", "HoursPerWeek", "NativeCountry", "Income"
- ]
- for row in x:
- row[0] = int(row[0] / 10)
- row[2] = int(row[2]) // 100000
- row[4] = int(row[4] / 5)
- row[10] = int(row[10] / 1000)
- row[12] = int(row[12] / 10)
- enc = OneHotEncoder(handle_unknown='ignore')
- x = enc.fit_transform(x).toarray()
all_features = enc.get_feature_names()
- x_size = len(complete_x)
- x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0)
+ # train model on a whole dataset
+ x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0)
for item in x_test:
complete_x.append((counter, item))
complete_y.append((counter, y_test[counter]))
@@ -86,18 +73,11 @@ if __name__ == "__main__":
clf = RandomForestClassifier(n_jobs=2, random_state=0)
clf.fit(x_train, y_train)
RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
- max_depth=None, max_features='auto', max_leaf_nodes=None,
- min_impurity_split=1e-07, min_samples_leaf=1,
- min_samples_split=2, min_weight_fraction_leaf=0.0,
- n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
- verbose=0, warm_start=False)
-
- # alpha is size significance coefficient
- # verbose option is for returning debug info while creating slices and printing it
- # k is number of top-slices we want
- # w is a weight of error function significance (1 - w) is a size significance propagated into optimization function
- # loss_type = 0 (l2 in case of regression model
- # loss_type = 1 (cross entropy in case of classification model)
+ max_depth=None, max_features='auto', max_leaf_nodes=None,
+ min_impurity_split=1e-07, min_samples_leaf=1,
+ min_samples_split=2, min_weight_fraction_leaf=0.0,
+ n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
+ verbose=0, warm_start=False)
preds = clf.predict(x_test)
predictions = []
counter = 0
@@ -108,6 +88,10 @@ if __name__ == "__main__":
mistakes = mistakes + 1
counter = counter + 1
lossF = mistakes / counter
+ # alpha is size significance coefficient
+ # verbose option is for returning debug info while creating slices and printing it
+ # k is number of top-slices we want
+ # w is a weight of error function significance (1 - w) is a size significance propagated into optimization function
# enumerator <union>/<join> indicates an approach of next level slices combination process:
# in case of <join> in order to create new node of current level slicer
diff --git a/scripts/staging/slicing/tests/regression/bd_spark_salary.py b/scripts/staging/slicing/tests/regression/bd_spark_salary.py
index c32414b..1889292 100644
--- a/scripts/staging/slicing/tests/regression/bd_spark_salary.py
+++ b/scripts/staging/slicing/tests/regression/bd_spark_salary.py
@@ -48,6 +48,7 @@ if __name__ == "__main__":
debug = args[5]
loss_type = int(args[6])
enumerator = args[7]
+ dataset = args[8]
else:
k = 10
w = 0.5
@@ -55,18 +56,18 @@ if __name__ == "__main__":
b_update = True
debug = True
loss_type = 0
- enumerator = "union"
+ dataset = 'slicing/datasets/parallel_data/salaries/rows1000.csv'
+ enumerator = "join"
- conf = SparkConf().setAppName("salary_test").setMaster('local[2]')
- num_partitions = 2
+ conf = SparkConf().setAppName("salary_test").setMaster('local[4]')
+ num_partitions = 8
model_type = "regression"
label = 'salary'
sparkContext = SparkContext(conf=conf)
sqlContext = SQLContext(sparkContext)
- fileRDD = sparkContext.textFile('salaries.csv', num_partitions)
+ fileRDD = sparkContext.textFile(dataset, num_partitions)
header = fileRDD.first()
head_split = header.split(",")
- head_split[0] = '_c0'
fileRDD = fileRDD.filter(lambda line: line != header)
data = fileRDD.map(lambda row: row.split(","))
dataset_df = sqlContext.createDataFrame(data, head_split)
@@ -74,7 +75,6 @@ if __name__ == "__main__":
cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"]
# initializing stages of main transformation pipeline
stages = []
- dataset_df = dataset_df.drop('_c0')
dataset_df = dataset_df.withColumn("id", sf.monotonically_increasing_id())
# bining numeric features by local binner udf function (specified for current dataset if needed)
dataset_df = dataset_df.withColumn('sincephd_bin', binner(dataset_df['sincephd']))
@@ -121,8 +121,7 @@ if __name__ == "__main__":
pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred['target'], pred['prediction'], pred['model_type']))
predictions = pred_df_fin.select('id', 'features', 'error').repartition(num_partitions)
converter = IndexToString(inputCol='features', outputCol='cats')
- all_features = range(predictions.toPandas().values[1][1].size)
- k = 10
+ all_features = list(decode_dict.keys())
if enumerator == "join":
join_data_parallel.parallel_process(all_features, predictions, f_l2, sparkContext, debug=debug, alpha=alpha,
k=k, w=w, loss_type=loss_type)
diff --git a/scripts/staging/slicing/tests/regression/spark_salary.py b/scripts/staging/slicing/tests/regression/spark_salary.py
index 52d0cf2..be9868f 100644
--- a/scripts/staging/slicing/tests/regression/spark_salary.py
+++ b/scripts/staging/slicing/tests/regression/spark_salary.py
@@ -49,36 +49,71 @@ if __name__ == "__main__":
debug = args[5]
loss_type = int(args[6])
enumerator = args[7]
+ dataset = args[8]
else:
k = 10
w = 0.5
- alpha = 6
+ alpha = 10
b_update = True
debug = True
loss_type = 0
- enumerator = "join"
+ enumerator = "union"
+ dataset = 'slicing/datasets/salaries.csv'
+ # dataset = 'slicing/datasets/parallel_attr/salaries/attr3000.csv'
- conf = SparkConf().setAppName("salary_test").setMaster('local[2]')
- num_partitions = 2
+ conf = SparkConf().setAppName("salary_test").setMaster('local[4]')
+ num_partitions = 4
model_type = "regression"
label = 'salary'
sparkContext = SparkContext(conf=conf)
sqlContext = SQLContext(sparkContext)
- dataset_df = sqlContext.read.csv('salaries.csv', header='true', inferSchema='true')
+ dataset_df = sqlContext.read.csv(dataset, header='true',
+ inferSchema='true')
# initializing stages of main transformation pipeline
stages = []
# list of categorical features for further hot-encoding
- cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"]
+ cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"] # base
+ # cat_features = ["rank", "discipline", "sex"] # 10
+ # cat_features = ["rank", "discipline", "sincephd", "sex"] # 50
+ # cat_features = ["rank", "discipline", "sex", "company", "country", "city", "start", "dept", "card"] # 100
+ # cat_features = ["rank", "discipline", "sex", "company", "country", "city", "skills", "language", "tz", "card",
+ # "uni", "dept", "race", "code", "job", "size", "previous", "stock"] # 1013
+ # cat_features = ["rank", "discipline", "sex", "company", "country", "city", "skills", "language", "tz", "card",
+ # "uni", "dept", "origin", "children", "bmi", "code", "job", "size", "previous", "stock", "market",
+ # "freq", "smoker", "region", "WorkClass", "Education", "EducationNum", "MaritalStatus",
+ # "Occupation", "Relationship", "color", "sepal_length", "sepal_width", "petal_length",
+ # "petal_width", "variety", "symboling", "normalized-losses", "make", "fuel-type", "aspiration",
+ # "num-of-doors", "body-style", "drive-wheels", "engine-location", "wheel-base", "length", "width",
+ # "height", "curb-weight", "engine-type", "num-of-cylinders", "engine-size", "fuel-system", "bore",
+ # "stroke", "compression-ratio", "horsepower", "peak-rpm", "city-mpg", "highway-mpg", "price",
+ # "codeNumber", "clump", "cellsize", "cellshape", "adhesion", "epitel", "nuclei", "chromatin",
+ # "nucleoli", "mitoses", "class", "elevation", "aspect", "slope", "distToHydr"]
+
+# cat_features = ["rank", "discipline", "sex", "company", "country", "city", "skills", "language", "tz", "card",
+# "uni", "dept", "origin", "children", "bmi", "code", "job", "size", "previous", "stock", "market",
+# "freq", "smoker", "region", "WorkClass", "Education", "EducationNum", "MaritalStatus",
+# "Occupation", "Relationship", "color", "sepal_length", "sepal_width", "petal_length",
+# "petal_width", "variety", "symboling", "normalized-losses", "make", "fuel-type", "aspiration",
+# "num-of-doors", "body-style", "drive-wheels", "engine-location", "wheel-base", "length", "width",
+# "height", "curb-weight", "engine-type", "num-of-cylinders", "engine-size", "fuel-system", "bore",
+# "stroke", "compression-ratio", "horsepower", "peak-rpm", "city-mpg", "highway-mpg", "price",
+# "codeNumber", "clump", "cellsize", "cellshape", "adhesion", "epitel", "nuclei", "chromatin",
+# "nucleoli", "mitoses", "class", "elevation", "aspect", "slope", "distToHydr", "distToRoad",
+# "hillshadeMorning", "hillSahdeNoon", "hillSadeAfternoon", "distToFire", "DSName", "T", "N", "p",
+# "k", "Bin", "Cost", "Sdratio", "correl", "cancor1", "cancor2", "fract1", "fract2", "skewness",
+# "kurtosis", "Hc", "Hx", "Mcx", "EnAttr", "NSRation", "Alg", "error"] # 3000
+
+ # lines till 87 only for base case
# removing column with ID field
- dataset_df = dataset_df.drop('_c0')
+ # dataset_df = dataset_df.drop('_c0')
# bining numeric features by local binner udf function (specified for current dataset if needed)
dataset_df = dataset_df.withColumn('sincephd_bin', binner(dataset_df['sincephd']))
dataset_df = dataset_df.withColumn('service_bin', binner(dataset_df['service']))
- dataset_df = dataset_df.withColumn('model_type', sf.lit(0))
dataset_df = dataset_df.drop('sincephd', 'service')
+ dataset_df = dataset_df.withColumn('model_type', sf.lit(0))
# hot encoding categorical features
for feature in cat_features:
- string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index")
+ string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index").setHandleInvalid("skip")
encoder = OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], outputCols=[feature + "_vec"])
encoder.setDropLast(False)
stages += [string_indexer, encoder]
@@ -112,9 +147,8 @@ if __name__ == "__main__":
pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred[label], pred['prediction'], pred['model_type']))
predictions = pred_df_fin.select('features', 'error').repartition(num_partitions)
converter = IndexToString(inputCol='features', outputCol='cats')
- all_features = range(predictions.toPandas().values[0][0].size)
+ all_features = list(decode_dict.keys())
predictions = predictions.collect()
- k = 10
if enumerator == "join":
spark_slicer.parallel_process(all_features, predictions, f_l2, sparkContext, debug=debug, alpha=alpha, k=k, w=w,
loss_type=loss_type, enumerator=enumerator)
diff --git a/scripts/staging/slicing/tests/regression/toy_tests.py b/scripts/staging/slicing/tests/regression/toy_tests.py
new file mode 100644
index 0000000..876aa8b
--- /dev/null
+++ b/scripts/staging/slicing/tests/regression/toy_tests.py
@@ -0,0 +1,428 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import unittest
+
+import pandas as pd
+from pyspark import SparkConf, SparkContext, SQLContext
+from pyspark.ml import Pipeline
+from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, IndexToString
+from pyspark.ml.regression import LinearRegression
+import pyspark.sql.functions as sf
+from sklearn import linear_model
+from sklearn.metrics import mean_squared_error, r2_score
+from sklearn.preprocessing import OneHotEncoder
+
+import slicing.base.slicer as slicer
+from slicing.base import union_slicer
+from slicing.base.node import Node
+from slicing.base.top_k import Topk
+from slicing.spark_modules import spark_utils, spark_slicer, spark_union_slicer
+
+
+class SliceTests(unittest.TestCase):
+ loss_type = 0
+ # x, y = m.generate_dataset(10, 100)
+ train_dataset = pd.read_csv("toy_train.csv")
+ attributes_amount = len(train_dataset.values[0])
+ model = linear_model.LinearRegression()
+ y_train = train_dataset.iloc[:, attributes_amount - 1:attributes_amount].values
+ x_train = train_dataset.iloc[:, 0:attributes_amount - 1].values
+ model.fit(x_train, y_train)
+ test_dataset = pd.read_csv("toy.csv")
+ y_test = test_dataset.iloc[:, attributes_amount - 1:attributes_amount].values
+ x_test = test_dataset.iloc[:, 0:attributes_amount - 1].values
+ y_pred = model.predict(x_test)
+ print("Mean squared error: %.2f"
+ % mean_squared_error(y_test, y_pred))
+ print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
+ # Now that we have trained the model, we can print the coefficient of x that it has predicted
+ print('Coefficients: \n', model.coef_)
+ enc = OneHotEncoder(handle_unknown='ignore')
+ x = enc.fit_transform(x_test).toarray()
+ complete_x = []
+ complete_y = []
+ counter = 0
+ for item in x:
+ complete_x.append((counter, item))
+ complete_y.append((counter, y_test[counter]))
+ counter = counter + 1
+ all_features = enc.get_feature_names()
+ loss = mean_squared_error(y_test, y_pred)
+ devs = (y_pred - y_test) ** 2
+ errors = []
+ counter = 0
+ for pred in devs:
+ errors.append((counter, pred))
+ counter = counter + 1
+ k = 5
+ w = 0.5
+ alpha = 4
+ top_k = Topk(k)
+ debug = True
+ b_update = True
+ first_level = slicer.make_first_level(all_features, list(complete_x), loss, len(complete_x), y_test, errors,
+ loss_type, top_k, alpha, w)
+ first_level_nodes = first_level[0]
+ slice_member = first_level_nodes[(7, 'x2_2')]
+
+ def test_attr_spark(self):
+ conf = SparkConf().setAppName("toy_test").setMaster('local[2]')
+ num_partitions = 2
+ enumerator = "join"
+ model_type = "regression"
+ label = 'target'
+ sparkContext = SparkContext(conf=conf)
+ sqlContext = SQLContext(sparkContext)
+ train_df = sqlContext.read.csv("toy_train.csv", header='true',
+ inferSchema='true')
+ test_df = sqlContext.read.csv("toy.csv", header='true',
+ inferSchema='true')
+ # initializing stages of main transformation pipeline
+ stages = []
+ # list of categorical features for further hot-encoding
+ cat_features = ['a', 'b', 'c']
+ for feature in cat_features:
+ string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index").setHandleInvalid("skip")
+ encoder = OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], outputCols=[feature + "_vec"])
+ encoder.setDropLast(False)
+ stages += [string_indexer, encoder]
+ assembler_inputs = [feature + "_vec" for feature in cat_features]
+ assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="assembled_inputs")
+ stages += [assembler]
+ assembler_final = VectorAssembler(inputCols=["assembled_inputs"], outputCol="features")
+ stages += [assembler_final]
+ pipeline = Pipeline(stages=stages)
+ train_pipeline_model = pipeline.fit(train_df)
+ test_pipeline_model = pipeline.fit(test_df)
+ train_df_transformed = train_pipeline_model.transform(train_df)
+ test_df_transformed = test_pipeline_model.transform(test_df)
+ train_df_transformed = train_df_transformed.withColumn('model_type', sf.lit(0))
+ test_df_transformed = test_df_transformed.withColumn('model_type', sf.lit(0))
+ decode_dict = {}
+ counter = 0
+ cat = 0
+ for feature in cat_features:
+ colIdx = test_df_transformed.select(feature, feature + "_index").distinct().rdd.collectAsMap()
+ colIdx = {k: v for k, v in sorted(colIdx.items(), key=lambda item: item[1])}
+ for item in colIdx:
+ decode_dict[counter] = (cat, item, colIdx[item], counter)
+ counter = counter + 1
+ cat = cat + 1
+ train_df_transform_fin = train_df_transformed.select('features', label, 'model_type')
+ test_df_transform_fin = test_df_transformed.select('features', label, 'model_type')
+ lr = LinearRegression(featuresCol='features', labelCol=label, maxIter=10, regParam=0.0, elasticNetParam=0.8)
+ lr_model = lr.fit(train_df_transform_fin)
+ eval = lr_model.evaluate(test_df_transform_fin)
+ f_l2 = eval.meanSquaredError
+ pred = eval.predictions
+ pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred[label], pred['prediction'], pred['model_type']))
+ predictions = pred_df_fin.select('features', 'error').repartition(num_partitions)
+ converter = IndexToString(inputCol='features', outputCol='cats')
+ all_features = list(decode_dict)
+ predictions = predictions.collect()
+ spark_join = spark_slicer.parallel_process(all_features, predictions, f_l2, sparkContext, debug=self.debug, alpha=self.alpha,
+ k=self.k, w=self.w, loss_type=self.loss_type, enumerator="join")
+ spark_union = spark_union_slicer.process(all_features, predictions, f_l2, sparkContext, debug=self.debug, alpha=self.alpha,
+ k=self.k, w=self.w, loss_type=self.loss_type, enumerator="union")
+ self.assertEqual(3, len(spark_join.slices))
+ print("check1")
+ self.assertEqual(spark_join.min_score, spark_union.min_score)
+ print("check2")
+ self.assertEqual(spark_join.keys, spark_union.keys)
+ print("check3")
+ self.assertEqual(len(spark_join.slices), len(spark_union.slices))
+ print("check4")
+ idx = -1
+ for sliced in spark_join.slices:
+ idx += 1
+ self.assertEqual(sliced.score, spark_union.slices[idx].score)
+ print("check5")
+
+ def test_features_number(self):
+ self.assertEqual(len(self.all_features), 9)
+ print("check 1")
+
+ def test_base_first_level(self):
+ self.assertEqual(9, len(self.first_level_nodes))
+ print("check 2")
+
+ def test_parents_first(self):
+ self.assertIn(('x2_2', 7), self.slice_member.parents)
+ print("check 3")
+
+ def test_name(self):
+ self.assertEqual('x2_2', self.slice_member.make_name())
+ print("check 4")
+
+ def test_size(self):
+ self.assertEqual(36, self.slice_member.size)
+ print("check 5")
+
+ def test_e_upper(self):
+ self.assertEqual(81, self.slice_member.e_upper)
+ print("check 6")
+
+ def test_loss(self):
+ self.assertEqual(22, int(self.slice_member.loss))
+ print("check 7")
+
+ def test_opt_fun(self):
+ self.slice_member.score = slicer.opt_fun(self.slice_member.loss, self.slice_member.size, self.loss, len(self.x_test), self.w)
+ print("check 8")
+
+ def test_score(self):
+ self.assertEqual(1.2673015873015872, self.slice_member.score)
+ print("check 9")
+
+ def test_base_join_enum(self):
+ cur_lvl_nodes = {}
+ all_nodes = {}
+ b_update = True
+ cur_lvl = 1
+ slice_index = (2, 'x0_3')
+ combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
+ len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
+ self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
+ self.assertEqual(6, len(combined[0]))
+ print("check1")
+
+ def test_parents_second(self):
+ cur_lvl_nodes = {}
+ all_nodes = {}
+ b_update = True
+ cur_lvl = 1
+ slice_index = (2, 'x0_3')
+ combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
+ len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
+ self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
+ parent1 = combined[0][('x0_3 && x1_3')]
+ parent2 = combined[0][('x0_3 && x2_2')]
+ new_node = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
+ new_node.parents = [parent1, parent2]
+ parent1_attr = parent1.attributes
+ parent2_attr = parent2.attributes
+ new_node_attr = slicer.union(parent1_attr, parent2_attr)
+ self.assertEqual(new_node_attr, [('x0_3', 2), ('x1_3', 5), ('x2_2', 7)])
+ print("check2")
+
+ def test_nonsense(self):
+ cur_lvl_nodes = {}
+ all_nodes = {}
+ b_update = True
+ cur_lvl = 1
+ slice_index = (2, 'x0_3')
+ combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
+ len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
+ self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
+ parent1 = combined[0][('x0_3 && x1_3')]
+ parent2 = combined[0][('x0_3 && x2_2')]
+ new_node = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
+ new_node.parents = [parent1, parent2]
+ parent1_attr = parent1.attributes
+ parent2_attr = parent2.attributes
+ new_node_attr = slicer.union(parent1_attr, parent2_attr)
+ new_node.attributes = new_node_attr
+ new_node.name = new_node.make_name()
+ flagTrue = slicer.slice_name_nonsense(parent1, parent2, 2)
+ self.assertEqual(True, flagTrue)
+ print("check3")
+
+ def test_non_nonsense(self):
+ cur_lvl_nodes = {}
+ all_nodes = {}
+ b_update = True
+ cur_lvl = 1
+ slice_index = (2, 'x0_3')
+ parent3 = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
+ parent3.parents = [self.first_level_nodes[(4, 'x1_2')], self.first_level_nodes[(7, 'x2_2')]]
+ parent3.attributes = [('x1_2', 4), ('x2_2', 7)]
+ combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
+ len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
+ self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
+ parent2 = combined[0]['x0_3 && x2_3']
+ parent3.key = (8, 'x1_2 && x2_2')
+ flag_nonsense = slicer.slice_name_nonsense(parent2, parent3, 2)
+ self.assertEqual(True, flag_nonsense)
+ print("check4")
+
+ def test_uppers(self):
+ cur_lvl_nodes = {}
+ all_nodes = {}
+ b_update = True
+ cur_lvl = 1
+ slice_index = (2, 'x0_3')
+ parent3 = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
+ parent3.parents = [self.first_level_nodes[(4, 'x1_2')], self.first_level_nodes[(7, 'x2_2')]]
+ parent3.attributes = [('x1_2', 4), ('x2_2', 7)]
+ combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
+ len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
+ self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
+ parent1 = combined[0]['x0_3 && x1_3']
+ parent2 = combined[0]['x0_3 && x2_3']
+ new_node = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
+ new_node.parents = [parent1, parent2]
+ new_node.calc_bounds(2, self.w)
+ self.assertEqual(25, new_node.s_upper)
+ print("check5")
+ self.assertEqual(398, int(new_node.c_upper))
+ print("check6")
+
+ def test_topk_slicing(self):
+ join_top_k = slicer.process(self.all_features, self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors,
+ self.debug, self.alpha, self.k, self.w, self.loss_type, self.b_update)
+ union_top_k = union_slicer.process(self.all_features, self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors,
+ self.debug, self.alpha, self.k, self.w, self.loss_type, self.b_update)
+ self.assertEqual(join_top_k.min_score, union_top_k.min_score)
+ print("check1")
+ self.assertEqual(join_top_k.keys, union_top_k.keys)
+ print("check2")
+ self.assertEqual(len(join_top_k.slices), len(union_top_k.slices))
+ print("check3")
+ idx = -1
+ for sliced in join_top_k.slices:
+ idx += 1
+ self.assertEqual(sliced.score, union_top_k.slices[idx].score)
+ print("check4")
+
+ def test_extreme_target(self):
+ test_dataset = pd.read_csv("/home/lana/diploma/project/slicing/datasets/toy_extreme_change.csv")
+ y_test = test_dataset.iloc[:, self.attributes_amount - 1:self.attributes_amount].values
+ x_test = test_dataset.iloc[:, 0:self.attributes_amount - 1].values
+ y_pred = self.model.predict(x_test)
+ print("Mean squared error: %.2f"
+ % mean_squared_error(y_test, y_pred))
+ print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
+ # Now that we have trained the model, we can print the coefficient of x that it has predicted
+ print('Coefficients: \n', self.model.coef_)
+ enc = OneHotEncoder(handle_unknown='ignore')
+ x = enc.fit_transform(x_test).toarray()
+ complete_x = []
+ complete_y = []
+ counter = 0
+ for item in x:
+ complete_x.append((counter, item))
+ complete_y.append((counter, y_test[counter]))
+ counter = counter + 1
+ all_features = enc.get_feature_names()
+ loss = mean_squared_error(y_test, y_pred)
+ devs = (y_pred - y_test) ** 2
+ errors = []
+ counter = 0
+ for pred in devs:
+ errors.append((counter, pred))
+ counter = counter + 1
+ k = 5
+ w = 0.5
+ alpha = 4
+ top_k = Topk(k)
+ debug = True
+ b_update = True
+ first_level = slicer.make_first_level(all_features, list(complete_x), loss, len(complete_x), y_test, errors,
+ self.loss_type, top_k, alpha, w)
+ first_level_nodes = first_level[0]
+ slice_member = first_level_nodes[(7, 'x2_2')]
+ self.assertGreater(slice_member.loss, self.slice_member.loss)
+ print("check 1")
+ self.assertGreater(slice_member.score, self.slice_member.score)
+ print("check 2")
+
+ def test_error_significance(self):
+ y_test = self.test_dataset.iloc[:, self.attributes_amount - 1:self.attributes_amount].values
+ x_test = self.test_dataset.iloc[:, 0:self.attributes_amount - 1].values
+ y_pred = self.model.predict(x_test)
+ print("Mean squared error: %.2f"
+ % mean_squared_error(y_test, y_pred))
+ print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
+ # Now that we have trained the model, we can print the coefficient of x that it has predicted
+ print('Coefficients: \n', self.model.coef_)
+ enc = OneHotEncoder(handle_unknown='ignore')
+ x = enc.fit_transform(x_test).toarray()
+ complete_x = []
+ complete_y = []
+ counter = 0
+ for item in x:
+ complete_x.append((counter, item))
+ complete_y.append((counter, y_test[counter]))
+ counter = counter + 1
+ all_features = enc.get_feature_names()
+ loss = mean_squared_error(y_test, y_pred)
+ devs = (y_pred - y_test) ** 2
+ errors = []
+ counter = 0
+ for pred in devs:
+ errors.append((counter, pred))
+ counter = counter + 1
+ k = 5
+ # Maximized size significance
+ w = 0
+ alpha = 4
+ top_k = Topk(k)
+ debug = True
+ b_update = True
+ first_level = slicer.make_first_level(all_features, list(complete_x), loss, len(complete_x), y_test, errors,
+ self.loss_type, top_k, alpha, w)
+ first_level_nodes = first_level[0]
+ slice_member = first_level_nodes[(7, 'x2_2')]
+ self.assertGreater(self.slice_member.score, slice_member.score)
+
+ def test_size_significance(self):
+ y_test = self.test_dataset.iloc[:, self.attributes_amount - 1:self.attributes_amount].values
+ x_test = self.test_dataset.iloc[:, 0:self.attributes_amount - 1].values
+ y_pred = self.model.predict(x_test)
+ print("Mean squared error: %.2f"
+ % mean_squared_error(y_test, y_pred))
+ print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
+ # Now that we have trained the model, we can print the coefficient of x that it has predicted
+ print('Coefficients: \n', self.model.coef_)
+ enc = OneHotEncoder(handle_unknown='ignore')
+ x = enc.fit_transform(x_test).toarray()
+ complete_x = []
+ complete_y = []
+ counter = 0
+ for item in x:
+ complete_x.append((counter, item))
+ complete_y.append((counter, y_test[counter]))
+ counter = counter + 1
+ all_features = enc.get_feature_names()
+ loss = mean_squared_error(y_test, y_pred)
+ devs = (y_pred - y_test) ** 2
+ errors = []
+ counter = 0
+ for pred in devs:
+ errors.append((counter, pred))
+ counter = counter + 1
+ k = 5
+ # Maximized size significance
+ w = 1
+ alpha = 4
+ top_k = Topk(k)
+ debug = True
+ b_update = True
+ first_level = slicer.make_first_level(all_features, list(complete_x), loss, len(complete_x), y_test, errors,
+ self.loss_type, top_k, alpha, w)
+ first_level_nodes = first_level[0]
+ slice_member = first_level_nodes[(7, 'x2_2')]
+ self.assertGreater(slice_member.score, self.slice_member.score)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/scripts/staging/slicing/utils/model_generator.py b/scripts/staging/slicing/utils/model_generator.py
new file mode 100644
index 0000000..8f80b74
--- /dev/null
+++ b/scripts/staging/slicing/utils/model_generator.py
@@ -0,0 +1,77 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import pandas as pd
+from sklearn import linear_model
+from sklearn.datasets import make_regression
+import numpy as np
+from sklearn.metrics import mean_squared_error, r2_score
+
+
+def generate_dataset(beta, n):
+ # Generate x as an array of `n` samples which can take a value between 0 and 5
+ x = np.random.random_integers(1, 3, (n, 3))
+ # Calculate `y` according to the equation discussed
+ coeff = [2, 3, 1]
+ coeff_mat = np.array(coeff).reshape(len(coeff), 1)
+ y = np.matmul(x, coeff_mat)
+ dataset = np.append(x, y, axis=1)
+ np.savetxt("toy_train.csv", dataset, delimiter=",", fmt='%s')
+ return x, y
+
+
+if __name__ == '__main__':
+ def main():
+ x, y = generate_dataset(10, 100)
+ x_train = x
+ y_train = y
+ dataset = np.append(x, y, axis=1)
+ np.savetxt("toy_train.csv", dataset, delimiter=",", fmt='%s')
+ model = linear_model.LinearRegression()
+
+ # Train the model using the training data that we created
+ model.fit(x_train, y_train)
+ # Now that we have trained the model, we can print the coefficient of x that it has predicted
+ print('Coefficients: \n', model.coef_)
+
+ # We then use the model to make predictions based on the test values of x
+ test_dataset = pd.read_csv("../datasets/toy.csv")
+ attributes_amount = len(test_dataset.values[0])
+ y_test = test_dataset.iloc[:, attributes_amount - 1:attributes_amount].values
+ x_test = test_dataset.iloc[:, 0:attributes_amount - 1].values
+ y_pred = model.predict(x_test)
+
+ # Now, we can calculate the models accuracy metrics based on what the actual value of y was
+ print("Mean squared error: %.2f"
+ % mean_squared_error(y_test, y_pred))
+ print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
+ # Coefficients: [[2. 3. 1.]]
+ # Mean squared error: 3.00
+ # r_2 statistic: 0.72
+
+ # dataset = np.append(x, y, axis=1)
+ # np.savetxt("toy.csv", dataset, delimiter=",", fmt='%s')
+
+ return model, x_test, y_test, y_pred
+
+
+
+