You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@systemml.apache.org by GitBox <gi...@apache.org> on 2020/04/08 18:03:38 UTC

[GitHub] [systemml] gilgenbergg opened a new pull request #881: spark wip for review

gilgenbergg opened a new pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407233247
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_utils.py
 ##########
 @@ -0,0 +1,122 @@
+from pyspark.sql.functions import udf
+from pyspark.sql.types import FloatType
+
+from slicing.base.SparkedNode import SparkedNode
+from slicing.base.slicer import opt_fun, union
+
+calc_loss = udf(lambda target, prediction, type: calc_loss_fun(target, prediction, type), FloatType())
+model_type_init = udf(lambda type: init_model_type(type))
+
+
+def calc_loss_fun(target, prediction, type):
+    if type == 0:
+        return (prediction - target) ** 2
+    elif type == 1:
+        if target == prediction:
+            return float(1)
+        else:
+            return float(0)
+
+
+def init_model_type(model_type):
+    if model_type == "regression":
+        return 0
+    elif model_type == "classification":
+        return 1
+
+
+def slice_join_nonsense(node_i, node_j, cur_lvl):
+    commons = 0
+    for attr1 in node_i.attributes:
+        for attr2 in node_j.attributes:
 
 Review comment:
   Since the attributes are sorted, we can replace the nested loop join with a merge join that performs a sorted set intersection.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] gilgenbergg commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
gilgenbergg commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407368006
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_slicer.py
 ##########
 @@ -0,0 +1,88 @@
+from pyspark import SparkContext
+
+from slicing.base.SparkedNode import SparkedNode
+from slicing.base.slicer import union, opt_fun
+from slicing.base.top_k import Topk
+from slicing.sparked import sparked_utils
+from slicing.sparked.sparked_utils import update_top_k
+
+
+def join_enum_fun(node_a, list_b, predictions, f_l2, debug, alpha, w, loss_type, cur_lvl, top_k):
+    x_size = len(predictions)
+    nodes = []
+    for node_i in range(len(list_b)):
+        flag = sparked_utils.slice_join_nonsense(node_i, node_a, cur_lvl)
+        if not flag:
+            new_node = SparkedNode(predictions, f_l2)
+            parents_set = set(new_node.parents)
+            parents_set.add(node_i)
+            parents_set.add(node_a)
+            new_node.parents = list(parents_set)
+            parent1_attr = node_a.attributes
+            parent2_attr = list_b[node_i].attributes
+            new_node_attr = union(parent1_attr, parent2_attr)
+            new_node.attributes = new_node_attr
+            new_node.name = new_node.make_name()
+            new_node.calc_bounds(cur_lvl, w)
+            # check if concrete data should be extracted or not (only for those that have score upper
+            # and if size of subset is big enough
+            to_slice = new_node.check_bounds(top_k, x_size, alpha)
+            if to_slice:
+                new_node.process_slice(loss_type)
+                new_node.score = opt_fun(new_node.loss, new_node.size, f_l2, x_size, w)
+                # we decide to add node to current level nodes (in order to make new combinations
+                # on the next one or not basing on its score value
+                if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys:
+                    nodes.append(new_node)
+                    top_k.add_new_top_slice(new_node)
+                elif new_node.check_bounds(top_k, x_size, alpha):
+                    nodes.append(new_node)
 
 Review comment:
   Thank you for comment @mboehm7 and for noticing this useless additions (that currently also take place in base implementation as well). 
   
   The algorithm is as follows:
   1) depending on lower and upper bounds we decide if we extract data of the slice or not.
   2a) data was extracted -> we can compute concrete metrics of size, error and score. If its score is high enough, we add the slice for next level enumeration, additionally checking if it can be included in list of top-k items.
   2b) data was not extracted -> we only decide to include slice for next level enumeration or not basing on its bounds.
   
   Will be fixed in next commit!
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407232599
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_union_slicer.py
 ##########
 @@ -0,0 +1,50 @@
+from pyspark import SparkContext
+
+from slicing.base.top_k import Topk
+from slicing.sparked import sparked_utils
+from slicing.sparked.sparked_utils import update_top_k
+
+
+def process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type, enumerator):
+    top_k = Topk(k)
+    cur_lvl = 0
+    levels = []
+    all_features = list(all_features)
+    first_level = {}
+    first_tasks = sc.parallelize(all_features)
+    SparkContext.broadcast(sc, top_k)
+    init_slices = first_tasks.mapPartitions(lambda features: sparked_utils.make_first_level(features, predictions, loss, top_k,
+                                                        alpha, k, w, loss_type)).map(lambda node: (node.key, node)).collect()
+    first_level.update(init_slices)
+    update_top_k(first_level, top_k, alpha, predictions)
+    SparkContext.broadcast(sc, top_k)
+    SparkContext.broadcast(sc, first_level)
+    levels.append(first_level)
+    cur_lvl = 1
+    top_k.print_topk()
+    SparkContext.broadcast(sc, top_k)
+    while len(levels[cur_lvl - 1]) > 0:
+        cur_lvl_res = {}
+        for left in range(int(cur_lvl / 2) + 1):
+            right = cur_lvl - left - 1
+            partitions = sc.parallelize(levels[left].values())
+            mapped = partitions.mapPartitions(lambda nodes: sparked_utils.nodes_enum(nodes, levels[right].values(), predictions,loss,
 
 Review comment:
   Would be possible to only broadcast the nodes that have been created in the last iteration, as all other levels have already been broadcast before?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407232363
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_slicer.py
 ##########
 @@ -0,0 +1,88 @@
+from pyspark import SparkContext
+
+from slicing.base.SparkedNode import SparkedNode
+from slicing.base.slicer import union, opt_fun
+from slicing.base.top_k import Topk
+from slicing.sparked import sparked_utils
+from slicing.sparked.sparked_utils import update_top_k
+
+
+def join_enum_fun(node_a, list_b, predictions, f_l2, debug, alpha, w, loss_type, cur_lvl, top_k):
+    x_size = len(predictions)
+    nodes = []
+    for node_i in range(len(list_b)):
+        flag = sparked_utils.slice_join_nonsense(node_i, node_a, cur_lvl)
+        if not flag:
+            new_node = SparkedNode(predictions, f_l2)
+            parents_set = set(new_node.parents)
+            parents_set.add(node_i)
+            parents_set.add(node_a)
+            new_node.parents = list(parents_set)
+            parent1_attr = node_a.attributes
+            parent2_attr = list_b[node_i].attributes
+            new_node_attr = union(parent1_attr, parent2_attr)
+            new_node.attributes = new_node_attr
+            new_node.name = new_node.make_name()
+            new_node.calc_bounds(cur_lvl, w)
+            # check if concrete data should be extracted or not (only for those that have score upper
+            # and if size of subset is big enough
+            to_slice = new_node.check_bounds(top_k, x_size, alpha)
+            if to_slice:
+                new_node.process_slice(loss_type)
+                new_node.score = opt_fun(new_node.loss, new_node.size, f_l2, x_size, w)
+                # we decide to add node to current level nodes (in order to make new combinations
+                # on the next one or not basing on its score value
+                if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys:
+                    nodes.append(new_node)
+                    top_k.add_new_top_slice(new_node)
+                elif new_node.check_bounds(top_k, x_size, alpha):
+                    nodes.append(new_node)
+            else:
+                if new_node.check_bounds(top_k, x_size, alpha):
+                        nodes.append(new_node)
+            if debug:
+                new_node.print_debug(top_k, cur_lvl)
+    return nodes
+
+
+def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type, enumerator):
+    top_k = Topk(k)
+    cur_lvl = 0
+    levels = []
+    first_level = {}
+    all_features = list(all_features)
+    first_tasks = sc.parallelize(all_features)
+    SparkContext.broadcast(sc, top_k)
+    init_slices = first_tasks.mapPartitions(
+        lambda features: sparked_utils.make_first_level(features, predictions, loss, top_k,
+                                                        alpha, k, w, loss_type)).map(
+        lambda node: (node.key, node)).collect()
+    first_level.update(init_slices)
+    update_top_k(first_level, top_k, alpha, predictions)
+    SparkContext.broadcast(sc, top_k)
+    SparkContext.broadcast(sc, first_level)
+    levels.append(first_level)
+    cur_lvl = cur_lvl + 1
+    top_k.print_topk()
+    SparkContext.broadcast(sc, top_k)
+    # checking the first partition of level. if not empty then processing otherwise no elements were added to this level
+    while len(levels[cur_lvl - 1]) > 0:
+        nodes_list = {}
+        partitions = sc.parallelize(levels[cur_lvl - 1].values())
+        mapped = partitions.mapPartitions(
+            lambda nodes: sparked_utils.nodes_enum(nodes, levels[cur_lvl - 1].values(), predictions, loss,
+                                                   top_k, alpha, k, w, loss_type, cur_lvl, debug, enumerator))
+        flattened = mapped.flatMap(lambda node: node)
+        nodes_list.update(flattened.map(lambda node: (node.key, node)).reduceByKey(lambda x, y: x).collect())
+        SparkContext.broadcast(sc, nodes_list)
+        levels.append(nodes_list)
+        update_top_k(nodes_list, top_k, alpha, predictions)
+        SparkContext.broadcast(sc, top_k)
 
 Review comment:
   Also if this broadcast is only needed for the next iteration, rather do it before it is used to avoid redundancy of statements before the loop.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407230983
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_slicer.py
 ##########
 @@ -0,0 +1,88 @@
+from pyspark import SparkContext
+
+from slicing.base.SparkedNode import SparkedNode
+from slicing.base.slicer import union, opt_fun
+from slicing.base.top_k import Topk
+from slicing.sparked import sparked_utils
+from slicing.sparked.sparked_utils import update_top_k
+
+
+def join_enum_fun(node_a, list_b, predictions, f_l2, debug, alpha, w, loss_type, cur_lvl, top_k):
+    x_size = len(predictions)
+    nodes = []
+    for node_i in range(len(list_b)):
+        flag = sparked_utils.slice_join_nonsense(node_i, node_a, cur_lvl)
 
 Review comment:
   In general, 'sparked' sounds a bit odd - maybe just replace all appearances with 'spark'

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407231975
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_slicer.py
 ##########
 @@ -0,0 +1,88 @@
+from pyspark import SparkContext
+
+from slicing.base.SparkedNode import SparkedNode
+from slicing.base.slicer import union, opt_fun
+from slicing.base.top_k import Topk
+from slicing.sparked import sparked_utils
+from slicing.sparked.sparked_utils import update_top_k
+
+
+def join_enum_fun(node_a, list_b, predictions, f_l2, debug, alpha, w, loss_type, cur_lvl, top_k):
+    x_size = len(predictions)
+    nodes = []
+    for node_i in range(len(list_b)):
+        flag = sparked_utils.slice_join_nonsense(node_i, node_a, cur_lvl)
+        if not flag:
+            new_node = SparkedNode(predictions, f_l2)
+            parents_set = set(new_node.parents)
+            parents_set.add(node_i)
+            parents_set.add(node_a)
+            new_node.parents = list(parents_set)
+            parent1_attr = node_a.attributes
+            parent2_attr = list_b[node_i].attributes
+            new_node_attr = union(parent1_attr, parent2_attr)
+            new_node.attributes = new_node_attr
+            new_node.name = new_node.make_name()
+            new_node.calc_bounds(cur_lvl, w)
+            # check if concrete data should be extracted or not (only for those that have score upper
+            # and if size of subset is big enough
+            to_slice = new_node.check_bounds(top_k, x_size, alpha)
+            if to_slice:
+                new_node.process_slice(loss_type)
+                new_node.score = opt_fun(new_node.loss, new_node.size, f_l2, x_size, w)
+                # we decide to add node to current level nodes (in order to make new combinations
+                # on the next one or not basing on its score value
+                if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys:
+                    nodes.append(new_node)
+                    top_k.add_new_top_slice(new_node)
+                elif new_node.check_bounds(top_k, x_size, alpha):
+                    nodes.append(new_node)
+            else:
+                if new_node.check_bounds(top_k, x_size, alpha):
+                        nodes.append(new_node)
+            if debug:
+                new_node.print_debug(top_k, cur_lvl)
+    return nodes
+
+
+def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type, enumerator):
+    top_k = Topk(k)
+    cur_lvl = 0
+    levels = []
+    first_level = {}
+    all_features = list(all_features)
+    first_tasks = sc.parallelize(all_features)
+    SparkContext.broadcast(sc, top_k)
+    init_slices = first_tasks.mapPartitions(
+        lambda features: sparked_utils.make_first_level(features, predictions, loss, top_k,
+                                                        alpha, k, w, loss_type)).map(
+        lambda node: (node.key, node)).collect()
+    first_level.update(init_slices)
+    update_top_k(first_level, top_k, alpha, predictions)
+    SparkContext.broadcast(sc, top_k)
+    SparkContext.broadcast(sc, first_level)
+    levels.append(first_level)
+    cur_lvl = cur_lvl + 1
+    top_k.print_topk()
+    SparkContext.broadcast(sc, top_k)
+    # checking the first partition of level. if not empty then processing otherwise no elements were added to this level
+    while len(levels[cur_lvl - 1]) > 0:
+        nodes_list = {}
+        partitions = sc.parallelize(levels[cur_lvl - 1].values())
+        mapped = partitions.mapPartitions(
+            lambda nodes: sparked_utils.nodes_enum(nodes, levels[cur_lvl - 1].values(), predictions, loss,
+                                                   top_k, alpha, k, w, loss_type, cur_lvl, debug, enumerator))
+        flattened = mapped.flatMap(lambda node: node)
+        nodes_list.update(flattened.map(lambda node: (node.key, node)).reduceByKey(lambda x, y: x).collect())
 
 Review comment:
   Could we maybe accomplish the same with `distinct` instead of `reduceByKey`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407234616
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_utils.py
 ##########
 @@ -0,0 +1,122 @@
+from pyspark.sql.functions import udf
+from pyspark.sql.types import FloatType
+
+from slicing.base.SparkedNode import SparkedNode
+from slicing.base.slicer import opt_fun, union
+
+calc_loss = udf(lambda target, prediction, type: calc_loss_fun(target, prediction, type), FloatType())
+model_type_init = udf(lambda type: init_model_type(type))
+
+
+def calc_loss_fun(target, prediction, type):
+    if type == 0:
+        return (prediction - target) ** 2
+    elif type == 1:
+        if target == prediction:
+            return float(1)
+        else:
+            return float(0)
+
+
+def init_model_type(model_type):
+    if model_type == "regression":
+        return 0
+    elif model_type == "classification":
+        return 1
+
+
+def slice_join_nonsense(node_i, node_j, cur_lvl):
+    commons = 0
+    for attr1 in node_i.attributes:
+        for attr2 in node_j.attributes:
+            if attr1 == attr2:
+                commons = commons + 1
+    return commons != cur_lvl - 1
+
+
+def make_first_level(features, predictions, loss, top_k, alpha, k, w, loss_type):
+    first_level = []
+    # First level slices are enumerated in a "classic way" (getting data and not analyzing bounds
+    for feature in features:
+        new_node = SparkedNode(loss, predictions)
+        new_node.parents = [feature]
+        new_node.attributes.append(feature)
+        new_node.name = new_node.make_name()
+        new_node.key = new_node.make_key()
+        new_node.process_slice(loss_type)
+        new_node.score = opt_fun(new_node.loss, new_node.size, loss, len(predictions), w)
+        new_node.c_upper = new_node.score
+        first_level.append(new_node)
+        new_node.print_debug(top_k, 0)
+        # constraints for 1st level nodes to be problematic candidates
+        #if new_node.check_constraint(top_k, len(predictions), alpha):
+            # this method updates top k slices if needed
+            #top_k.add_new_top_slice(new_node)
+    return first_level
+
+
+def slice_union_nonsense(node_i, node_j):
+    flag = False
+    for attr1 in node_i.attributes:
+        for attr2 in node_j.attributes:
+            if attr1 == attr2:
+                # there are common attributes which is not the case we need
+                flag = True
+                break
 
 Review comment:
   The break only terminates the inner loop, but the outer continues. So, I would recommend to simply put a `return True`, along with a `return False` at the end of the method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407234720
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_utils.py
 ##########
 @@ -0,0 +1,122 @@
+from pyspark.sql.functions import udf
+from pyspark.sql.types import FloatType
+
+from slicing.base.SparkedNode import SparkedNode
+from slicing.base.slicer import opt_fun, union
+
+calc_loss = udf(lambda target, prediction, type: calc_loss_fun(target, prediction, type), FloatType())
+model_type_init = udf(lambda type: init_model_type(type))
+
+
+def calc_loss_fun(target, prediction, type):
+    if type == 0:
+        return (prediction - target) ** 2
+    elif type == 1:
+        if target == prediction:
+            return float(1)
+        else:
+            return float(0)
+
+
+def init_model_type(model_type):
+    if model_type == "regression":
+        return 0
+    elif model_type == "classification":
+        return 1
+
+
+def slice_join_nonsense(node_i, node_j, cur_lvl):
+    commons = 0
+    for attr1 in node_i.attributes:
+        for attr2 in node_j.attributes:
+            if attr1 == attr2:
+                commons = commons + 1
+    return commons != cur_lvl - 1
+
+
+def make_first_level(features, predictions, loss, top_k, alpha, k, w, loss_type):
+    first_level = []
+    # First level slices are enumerated in a "classic way" (getting data and not analyzing bounds
+    for feature in features:
+        new_node = SparkedNode(loss, predictions)
+        new_node.parents = [feature]
+        new_node.attributes.append(feature)
+        new_node.name = new_node.make_name()
+        new_node.key = new_node.make_key()
+        new_node.process_slice(loss_type)
+        new_node.score = opt_fun(new_node.loss, new_node.size, loss, len(predictions), w)
+        new_node.c_upper = new_node.score
+        first_level.append(new_node)
+        new_node.print_debug(top_k, 0)
+        # constraints for 1st level nodes to be problematic candidates
+        #if new_node.check_constraint(top_k, len(predictions), alpha):
+            # this method updates top k slices if needed
+            #top_k.add_new_top_slice(new_node)
+    return first_level
+
+
+def slice_union_nonsense(node_i, node_j):
 
 Review comment:
   Maybe find better names for these methods (drop the 'nonsense' part)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407231698
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_slicer.py
 ##########
 @@ -0,0 +1,88 @@
+from pyspark import SparkContext
+
+from slicing.base.SparkedNode import SparkedNode
+from slicing.base.slicer import union, opt_fun
+from slicing.base.top_k import Topk
+from slicing.sparked import sparked_utils
+from slicing.sparked.sparked_utils import update_top_k
+
+
+def join_enum_fun(node_a, list_b, predictions, f_l2, debug, alpha, w, loss_type, cur_lvl, top_k):
+    x_size = len(predictions)
+    nodes = []
+    for node_i in range(len(list_b)):
+        flag = sparked_utils.slice_join_nonsense(node_i, node_a, cur_lvl)
+        if not flag:
+            new_node = SparkedNode(predictions, f_l2)
+            parents_set = set(new_node.parents)
+            parents_set.add(node_i)
+            parents_set.add(node_a)
+            new_node.parents = list(parents_set)
+            parent1_attr = node_a.attributes
+            parent2_attr = list_b[node_i].attributes
+            new_node_attr = union(parent1_attr, parent2_attr)
+            new_node.attributes = new_node_attr
+            new_node.name = new_node.make_name()
+            new_node.calc_bounds(cur_lvl, w)
+            # check if concrete data should be extracted or not (only for those that have score upper
+            # and if size of subset is big enough
+            to_slice = new_node.check_bounds(top_k, x_size, alpha)
+            if to_slice:
+                new_node.process_slice(loss_type)
+                new_node.score = opt_fun(new_node.loss, new_node.size, f_l2, x_size, w)
+                # we decide to add node to current level nodes (in order to make new combinations
+                # on the next one or not basing on its score value
+                if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys:
+                    nodes.append(new_node)
+                    top_k.add_new_top_slice(new_node)
+                elif new_node.check_bounds(top_k, x_size, alpha):
+                    nodes.append(new_node)
+            else:
+                if new_node.check_bounds(top_k, x_size, alpha):
+                        nodes.append(new_node)
+            if debug:
+                new_node.print_debug(top_k, cur_lvl)
+    return nodes
+
+
+def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type, enumerator):
+    top_k = Topk(k)
+    cur_lvl = 0
+    levels = []
+    first_level = {}
+    all_features = list(all_features)
+    first_tasks = sc.parallelize(all_features)
+    SparkContext.broadcast(sc, top_k)
+    init_slices = first_tasks.mapPartitions(
+        lambda features: sparked_utils.make_first_level(features, predictions, loss, top_k,
+                                                        alpha, k, w, loss_type)).map(
 
 Review comment:
   the formatting is a bit odd - try to indent it as follows:
   ```
   <some computation>
       .map(<lambda 1>)
       .map(<lambda 2>)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407233029
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_utils.py
 ##########
 @@ -0,0 +1,122 @@
+from pyspark.sql.functions import udf
+from pyspark.sql.types import FloatType
+
+from slicing.base.SparkedNode import SparkedNode
+from slicing.base.slicer import opt_fun, union
+
+calc_loss = udf(lambda target, prediction, type: calc_loss_fun(target, prediction, type), FloatType())
+model_type_init = udf(lambda type: init_model_type(type))
+
+
+def calc_loss_fun(target, prediction, type):
+    if type == 0:
+        return (prediction - target) ** 2
+    elif type == 1:
+        if target == prediction:
+            return float(1)
+        else:
+            return float(0)
 
 Review comment:
   we could replace these 4 lines with return `return float(target == prediction)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407234906
 
 

 ##########
 File path: scripts/staging/slicing/tests/classification/sparked_adults.py
 ##########
 @@ -0,0 +1,97 @@
+from pyspark import SparkConf, SparkContext
+from pyspark.ml import Pipeline
+from pyspark.ml.classification import RandomForestClassifier
+from pyspark.ml.evaluation import MulticlassClassificationEvaluator
+from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
+from pyspark.sql import SQLContext
+import pyspark.sql.functions as sf
+
+from slicing.sparked import sparked_utils, sparked_slicer, sparked_union_slicer
+
+
+ten_binner = sf.udf(lambda arg: int(arg / 10))
+fnlwgt_binner = sf.udf(lambda arg: int(arg // 100000))
+edu_binner = sf.udf(lambda arg: int(arg / 5))
+cap_gain_binner = sf.udf(lambda arg: int(arg / 1000))
+
+conf = SparkConf().setAppName("adults_test").setMaster('local[2]')
+num_partitions = 2
+model_type = "classification"
+label = 'Income'
+sparkContext = SparkContext(conf=conf)
+sqlContext = SQLContext(sparkContext)
+dataset_df = sqlContext.read.csv('/slicing/datasets/adult.csv', header='true', inferSchema='true')
 
 Review comment:
   Related to these tests, it would be good to start creating versioned experimental results in our paper repo.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] j143 edited a comment on issue #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
j143 edited a comment on issue #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#issuecomment-611857416
 
 
   Hi @gilgenbergg - PRs would most likely be attended to, when description https://github.com/apache/systemml/pull/836#issue-222288089 is provided.
   
   Along with any of the design documents 
   1. google docs, 
   2. mail list, 
   3. screeshots,
   4.  terminal outputs
   5. check list
   6. Notebook (kaggle link, colab etc.)
   
   >Also, feel free to ping the person, you've had discussion with. This would act as the PR will act as a documentation for the future contributors. 
   
   I will delete this comment, once seen to keep this discussion upto the point. 
   
   >If no one attends this by April 11, 2020 (6 AM london time)
   
   Thank you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] j143 removed a comment on issue #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
j143 removed a comment on issue #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#issuecomment-611857416
 
 
   Hi @gilgenbergg - PRs would most likely be attended to, when description https://github.com/apache/systemml/pull/836#issue-222288089 is provided.
   
   Along with any of the design documents 
   1. google docs, 
   2. mail list, 
   3. screeshots,
   4.  terminal outputs
   5. check list
   6. Notebook (kaggle link, colab etc.)
   
   >Also, feel free to ping the person, you've had discussion with. This would act as the PR will act as a documentation for the future contributors. 
   
   I will delete this comment, once seen to keep this discussion upto the point. 
   
   >If no one attends this by April 11, 2020 (6 AM london time)
   
   Thank you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407232217
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_slicer.py
 ##########
 @@ -0,0 +1,88 @@
+from pyspark import SparkContext
+
+from slicing.base.SparkedNode import SparkedNode
+from slicing.base.slicer import union, opt_fun
+from slicing.base.top_k import Topk
+from slicing.sparked import sparked_utils
+from slicing.sparked.sparked_utils import update_top_k
+
+
+def join_enum_fun(node_a, list_b, predictions, f_l2, debug, alpha, w, loss_type, cur_lvl, top_k):
+    x_size = len(predictions)
+    nodes = []
+    for node_i in range(len(list_b)):
+        flag = sparked_utils.slice_join_nonsense(node_i, node_a, cur_lvl)
+        if not flag:
+            new_node = SparkedNode(predictions, f_l2)
+            parents_set = set(new_node.parents)
+            parents_set.add(node_i)
+            parents_set.add(node_a)
+            new_node.parents = list(parents_set)
+            parent1_attr = node_a.attributes
+            parent2_attr = list_b[node_i].attributes
+            new_node_attr = union(parent1_attr, parent2_attr)
+            new_node.attributes = new_node_attr
+            new_node.name = new_node.make_name()
+            new_node.calc_bounds(cur_lvl, w)
+            # check if concrete data should be extracted or not (only for those that have score upper
+            # and if size of subset is big enough
+            to_slice = new_node.check_bounds(top_k, x_size, alpha)
+            if to_slice:
+                new_node.process_slice(loss_type)
+                new_node.score = opt_fun(new_node.loss, new_node.size, f_l2, x_size, w)
+                # we decide to add node to current level nodes (in order to make new combinations
+                # on the next one or not basing on its score value
+                if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys:
+                    nodes.append(new_node)
+                    top_k.add_new_top_slice(new_node)
+                elif new_node.check_bounds(top_k, x_size, alpha):
+                    nodes.append(new_node)
+            else:
+                if new_node.check_bounds(top_k, x_size, alpha):
+                        nodes.append(new_node)
+            if debug:
+                new_node.print_debug(top_k, cur_lvl)
+    return nodes
+
+
+def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type, enumerator):
+    top_k = Topk(k)
+    cur_lvl = 0
+    levels = []
+    first_level = {}
+    all_features = list(all_features)
+    first_tasks = sc.parallelize(all_features)
+    SparkContext.broadcast(sc, top_k)
+    init_slices = first_tasks.mapPartitions(
+        lambda features: sparked_utils.make_first_level(features, predictions, loss, top_k,
+                                                        alpha, k, w, loss_type)).map(
+        lambda node: (node.key, node)).collect()
+    first_level.update(init_slices)
+    update_top_k(first_level, top_k, alpha, predictions)
+    SparkContext.broadcast(sc, top_k)
+    SparkContext.broadcast(sc, first_level)
+    levels.append(first_level)
+    cur_lvl = cur_lvl + 1
+    top_k.print_topk()
+    SparkContext.broadcast(sc, top_k)
 
 Review comment:
   Why are the broadcasts not assigned to a broadcast variable and later passed into the functions? It looks like we create the broadcast but pass the data structure `top_k` as is into the task closure. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407232234
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_slicer.py
 ##########
 @@ -0,0 +1,88 @@
+from pyspark import SparkContext
+
+from slicing.base.SparkedNode import SparkedNode
+from slicing.base.slicer import union, opt_fun
+from slicing.base.top_k import Topk
+from slicing.sparked import sparked_utils
+from slicing.sparked.sparked_utils import update_top_k
+
+
+def join_enum_fun(node_a, list_b, predictions, f_l2, debug, alpha, w, loss_type, cur_lvl, top_k):
+    x_size = len(predictions)
+    nodes = []
+    for node_i in range(len(list_b)):
+        flag = sparked_utils.slice_join_nonsense(node_i, node_a, cur_lvl)
+        if not flag:
+            new_node = SparkedNode(predictions, f_l2)
+            parents_set = set(new_node.parents)
+            parents_set.add(node_i)
+            parents_set.add(node_a)
+            new_node.parents = list(parents_set)
+            parent1_attr = node_a.attributes
+            parent2_attr = list_b[node_i].attributes
+            new_node_attr = union(parent1_attr, parent2_attr)
+            new_node.attributes = new_node_attr
+            new_node.name = new_node.make_name()
+            new_node.calc_bounds(cur_lvl, w)
+            # check if concrete data should be extracted or not (only for those that have score upper
+            # and if size of subset is big enough
+            to_slice = new_node.check_bounds(top_k, x_size, alpha)
+            if to_slice:
+                new_node.process_slice(loss_type)
+                new_node.score = opt_fun(new_node.loss, new_node.size, f_l2, x_size, w)
+                # we decide to add node to current level nodes (in order to make new combinations
+                # on the next one or not basing on its score value
+                if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys:
+                    nodes.append(new_node)
+                    top_k.add_new_top_slice(new_node)
+                elif new_node.check_bounds(top_k, x_size, alpha):
+                    nodes.append(new_node)
+            else:
+                if new_node.check_bounds(top_k, x_size, alpha):
+                        nodes.append(new_node)
+            if debug:
+                new_node.print_debug(top_k, cur_lvl)
+    return nodes
+
+
+def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type, enumerator):
+    top_k = Topk(k)
+    cur_lvl = 0
+    levels = []
+    first_level = {}
+    all_features = list(all_features)
+    first_tasks = sc.parallelize(all_features)
+    SparkContext.broadcast(sc, top_k)
+    init_slices = first_tasks.mapPartitions(
+        lambda features: sparked_utils.make_first_level(features, predictions, loss, top_k,
+                                                        alpha, k, w, loss_type)).map(
+        lambda node: (node.key, node)).collect()
+    first_level.update(init_slices)
+    update_top_k(first_level, top_k, alpha, predictions)
+    SparkContext.broadcast(sc, top_k)
+    SparkContext.broadcast(sc, first_level)
+    levels.append(first_level)
+    cur_lvl = cur_lvl + 1
+    top_k.print_topk()
+    SparkContext.broadcast(sc, top_k)
+    # checking the first partition of level. if not empty then processing otherwise no elements were added to this level
+    while len(levels[cur_lvl - 1]) > 0:
+        nodes_list = {}
+        partitions = sc.parallelize(levels[cur_lvl - 1].values())
+        mapped = partitions.mapPartitions(
+            lambda nodes: sparked_utils.nodes_enum(nodes, levels[cur_lvl - 1].values(), predictions, loss,
+                                                   top_k, alpha, k, w, loss_type, cur_lvl, debug, enumerator))
+        flattened = mapped.flatMap(lambda node: node)
+        nodes_list.update(flattened.map(lambda node: (node.key, node)).reduceByKey(lambda x, y: x).collect())
+        SparkContext.broadcast(sc, nodes_list)
+        levels.append(nodes_list)
+        update_top_k(nodes_list, top_k, alpha, predictions)
+        SparkContext.broadcast(sc, top_k)
 
 Review comment:
   same as above

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] mboehm7 commented on a change in pull request #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
mboehm7 commented on a change in pull request #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#discussion_r407231498
 
 

 ##########
 File path: scripts/staging/slicing/sparked/sparked_slicer.py
 ##########
 @@ -0,0 +1,88 @@
+from pyspark import SparkContext
+
+from slicing.base.SparkedNode import SparkedNode
+from slicing.base.slicer import union, opt_fun
+from slicing.base.top_k import Topk
+from slicing.sparked import sparked_utils
+from slicing.sparked.sparked_utils import update_top_k
+
+
+def join_enum_fun(node_a, list_b, predictions, f_l2, debug, alpha, w, loss_type, cur_lvl, top_k):
+    x_size = len(predictions)
+    nodes = []
+    for node_i in range(len(list_b)):
+        flag = sparked_utils.slice_join_nonsense(node_i, node_a, cur_lvl)
+        if not flag:
+            new_node = SparkedNode(predictions, f_l2)
+            parents_set = set(new_node.parents)
+            parents_set.add(node_i)
+            parents_set.add(node_a)
+            new_node.parents = list(parents_set)
+            parent1_attr = node_a.attributes
+            parent2_attr = list_b[node_i].attributes
+            new_node_attr = union(parent1_attr, parent2_attr)
+            new_node.attributes = new_node_attr
+            new_node.name = new_node.make_name()
+            new_node.calc_bounds(cur_lvl, w)
+            # check if concrete data should be extracted or not (only for those that have score upper
+            # and if size of subset is big enough
+            to_slice = new_node.check_bounds(top_k, x_size, alpha)
+            if to_slice:
+                new_node.process_slice(loss_type)
+                new_node.score = opt_fun(new_node.loss, new_node.size, f_l2, x_size, w)
+                # we decide to add node to current level nodes (in order to make new combinations
+                # on the next one or not basing on its score value
+                if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys:
+                    nodes.append(new_node)
+                    top_k.add_new_top_slice(new_node)
+                elif new_node.check_bounds(top_k, x_size, alpha):
+                    nodes.append(new_node)
 
 Review comment:
   So we add the new node even though it did not pass the check constraint. Where do we distinguish between nodes that are terminated, and nodes that need to be investigated?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [systemml] j143 commented on issue #881: spark wip for review

Posted by GitBox <gi...@apache.org>.
j143 commented on issue #881: spark wip for review
URL: https://github.com/apache/systemml/pull/881#issuecomment-611857416
 
 
   Hi @gilgenbergg - PRs would most likely be attended to, when description https://github.com/apache/systemml/pull/836#issue-222288089 is provided.
   
   Along with any of the design documents 
   1. google docs, 
   2. mail list, 
   3. screeshots,
   4.  terminal outputs
   5. check list
   
   >Also, feel free to ping the person, you've had discussion with. This would act as the PR will act as a documentation for the future contributors. 
   
   I will delete this comment, once seen to keep this discussion upto the point. 
   
   >If no one attends this by April 11, 2020 (6 AM london time)
   
   Thank you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services