You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by GitBox <gi...@apache.org> on 2022/04/08 18:58:55 UTC
[GitHub] [incubator-wayang] github-actions[bot] opened a new issue, #144: maybe would be better just leave the number as key
github-actions[bot] opened a new issue, #144:
URL: https://github.com/apache/incubator-wayang/issues/144
maybe would be better just leave the number as key
udf=func,
Nevertheless, current configuration runs it over Java
To execute the plan directly in the program driver
graph.print_adjlist()
Separates Python Plan into a List of Pipelines
UDF always will receive:
x: a Node object,
y: an object representing the result of the last iteration,
z: a collection to store final results inside your UDF
writer.write_message(self.descriptor)
https://github.com/apache/incubator-wayang/blob/4cc0bfdca06dda171b661822daae5fa438d9d475/python/old_code/orchestrator/dataquanta.py#L150
```python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from old_code.orchestrator.operator import Operator
from old_code.old_graph.graph import Graph
from old_code.old_graph.traversal import Traversal
from old_code.protobuf.planwriter import MessageWriter
import itertools
import collections
import logging
from functools import reduce
import operator
# Wraps a Source operation to create an iterable
class DataQuantaBuilder:
def __init__(self, descriptor):
self.descriptor = descriptor
def source(self, source):
if type(source) is str:
source_ori = open(source, "r")
else:
source_ori = source
return DataQuanta(
Operator(
operator_type="source",
udf=source,
iterator=iter(source_ori),
previous=[],
python_exec=False
),
descriptor=self.descriptor
)
# Wraps an operation over an iterable
class DataQuanta:
def __init__(self, operator=None, descriptor=None):
self.operator = operator
self.descriptor = descriptor
if self.operator.is_source():
self.descriptor.add_source(self.operator)
if self.operator.is_sink():
self.descriptor.add_sink(self.operator)
# Operational Functions
def filter(self, udf):
def func(iterator):
return filter(udf, iterator)
return DataQuanta(
Operator(
operator_type="filter",
udf=func,
previous=[self.operator],
python_exec=True
),
descriptor=self.descriptor
)
def flatmap(self, udf):
def auxfunc(iterator):
return itertools.chain.from_iterable(map(udf, iterator))
def func(iterator):
mapped = map(udf, iterator)
flattened = flatten_single_dim(mapped)
yield from flattened
def flatten_single_dim(mapped):
for item in mapped:
for subitem in item:
yield subitem
return DataQuanta(
Operator(
operator_type="flatmap",
udf=func,
previous=[self.operator],
python_exec=True
),
descriptor=self.descriptor
)
def group_by(self, udf):
def func(iterator):
# TODO key should be given by "udf"
return itertools.groupby(iterator, key=operator.itemgetter(0))
#return itertools.groupby(sorted(iterator), key=itertools.itemgetter(0))
return DataQuanta(
Operator(
operator_type="group_by",
udf=func,
previous=[self.operator],
python_exec=True
),
descriptor=self.descriptor
)
def map(self, udf):
def func(iterator):
return map(udf, iterator)
return DataQuanta(
Operator(
operator_type="map",
udf=func,
previous=[self.operator],
python_exec=True
),
descriptor=self.descriptor
)
# Key specifies pivot dimensions
# UDF specifies reducer function
def reduce_by_key(self, keys, udf):
op = Operator(
operator_type="reduce_by_key",
udf=udf,
previous=[self.operator],
python_exec=False
)
#print(len(keys), keys)
for i in range(0, len(keys)):
"""if keys[i] is int:
op.set_parameter("vector_position|"+str(i), keys[i])
else:
op.set_parameter("dimension_key|"+str(i), keys[i])"""
# TODO maybe would be better just leave the number as key
op.set_parameter("dimension|"+str(i+1), keys[i])
return DataQuanta(
op,
descriptor=self.descriptor
)
def reduce(self, udf):
def func(iterator):
return reduce(udf, iterator)
return DataQuanta(
Operator(
operator_type="reduce",
udf=func,
previous=[self.operator],
python_exec=True
),
descriptor=self.descriptor
)
def sink(self, path, end="\n"):
def consume(iterator):
with open(path, 'w') as f:
for x in iterator:
f.write(str(x) + end)
def func(iterator):
consume(iterator)
# return self.__run(consume)
return DataQuanta(
Operator(
operator_type="sink",
udf=path,
# To execute directly uncomment
# udf=func,
previous=[self.operator],
python_exec=False
),
descriptor=self.descriptor
)
def sort(self, udf):
def func(iterator):
return sorted(iterator, key=udf)
return DataQuanta(
Operator(
operator_type="sort",
udf=func,
previous=[self.operator],
python_exec=True
),
descriptor=self.descriptor
)
# This function allow the union to be performed by Python
# Nevertheless, current configuration runs it over Java
def union(self, other):
def func(iterator):
return itertools.chain(iterator, other.operator.getIterator())
return DataQuanta(
Operator(
operator_type="union",
udf=func,
previous=[self.operator, other.operator],
python_exec=False
),
descriptor=self.descriptor
)
def __run(self, consumer):
consumer(self.operator.getIterator())
# Execution Functions
def console(self, end="\n"):
def consume(iterator):
for x in iterator:
print(x, end=end)
self.__run(consume)
# Only for debugging purposes!
# To execute the plan directly in the program driver
def execute(self):
logging.warn("DEBUG Execution")
logging.info("Reminder to swap SINK UDF value from path to func")
logging.debug(self.operator.previous[0].operator_type)
if self.operator.is_sink():
logging.debug(self.operator.operator_type)
logging.debug(self.operator.udf)
logging.debug(len(self.operator.previous))
self.operator.udf(self.operator.previous[0].getIterator())
else:
logging.error("Plan must call execute from SINK type of operator")
raise RuntimeError
# Converts Python Functional Plan to valid Wayang Plan
def to_wayang_plan(self):
sinks = self.descriptor.get_sinks()
if len(sinks) == 0:
return
graph = Graph()
graph.populate(self.descriptor.get_sinks())
# Uncomment to check the Graph built
# graph.print_adjlist()
# Function to be consumed by Traverse
# Separates Python Plan into a List of Pipelines
def define_pipelines(node1, current_pipeline, collection):
def store_unique(pipe_to_insert):
for pipe in collection:
if equivalent_lists(pipe, pipe_to_insert):
return
collection.append(pipe_to_insert)
def equivalent_lists(l1, l2):
if collections.Counter(l1) == collections.Counter(l2):
return True
else:
return False
if not current_pipeline:
current_pipeline = [node1]
elif node1.operator.is_boundary():
store_unique(current_pipeline.copy())
current_pipeline.clear()
current_pipeline.append(node1)
else:
current_pipeline.append(node1)
if node1.operator.sink:
store_unique(current_pipeline.copy())
current_pipeline.clear()
return current_pipeline
# Works over the graph
trans = Traversal(
graph=graph,
origin=self.descriptor.get_sources(),
# udf=lambda x, y, z: d(x, y, z)
# UDF always will receive:
# x: a Node object,
# y: an object representing the result of the last iteration,
# z: a collection to store final results inside your UDF
udf=lambda x, y, z: define_pipelines(x, y, z)
)
# Gets the results of the traverse process
collected_stages = trans.get_collected_data()
# Passing the Stages to a Wayang message writer
writer = MessageWriter()
a = 0
# Stage is composed of class Node objects
for stage in collected_stages:
a += 1
logging.info("///")
logging.info("stage" + str(a))
writer.process_pipeline(stage)
writer.set_dependencies()
# Uses a file to provide the plan
# writer.write_message(self.descriptor)
# Send the plan to Wayang REST api directly
writer.send_message(self.descriptor)
```
29b7cf7a2a05abd6b06329f92725e3722568951a
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@wayang.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org