You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by GitBox <gi...@apache.org> on 2022/06/19 22:23:51 UTC

[GitHub] [incubator-wayang] github-actions[bot] opened a new issue, #251: ADD id to executions

github-actions[bot] opened a new issue, #251:
URL: https://github.com/apache/incubator-wayang/issues/251

   ADD id to executions
   
   https://github.com/apache/incubator-wayang/blob/d859a97d43a8c3c3c964150eaff8f3833e41ea75/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java#L68
   
   ```java
   
   /*
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
   
   package org.apache.wayang.api.rest.server.spring.general;
   
   import com.google.protobuf.ByteString;
   import org.apache.wayang.api.python.function.WrappedPythonFunction;
   import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
   import org.apache.wayang.basic.operators.*;
   import org.apache.wayang.commons.serializable.OperatorProto;
   import org.apache.wayang.commons.serializable.PlanProto;
   import org.apache.wayang.core.api.WayangContext;
   import org.apache.wayang.core.api.exception.WayangException;
   import org.apache.wayang.core.function.MapPartitionsDescriptor;
   import org.apache.wayang.core.plan.wayangplan.OperatorBase;
   import org.springframework.web.bind.annotation.GetMapping;
   import org.springframework.web.bind.annotation.PostMapping;
   import org.springframework.web.bind.annotation.RequestParam;
   import org.springframework.web.bind.annotation.RestController;
   
   import java.io.File;
   import java.io.FileInputStream;
   import java.io.FileNotFoundException;
   import java.io.IOException;
   import java.net.MalformedURLException;
   import java.net.URI;
   import java.net.URISyntaxException;
   import java.net.URL;
   import java.nio.file.Paths;
   import java.util.*;
   import java.util.stream.Collectors;
   
   import org.apache.wayang.core.plan.wayangplan.WayangPlan;
   import org.apache.wayang.java.Java;
   import org.apache.wayang.spark.Spark;
   
   import org.apache.wayang.commons.serializable.WayangPlanProto;
   import org.springframework.web.multipart.MultipartFile;
   
   
   @RestController
   public class WayangController {
   
       @GetMapping("/plan/create/fromfile")
       public String planFromFile(
               //@RequestParam("file") MultipartFile file
       ){
   
           try {
               FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
               WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);
   
               /*TODO ADD id to executions*/
               wpb.getWayangContext().execute(wpb.getWayangPlan());
   
           } catch (IOException e) {
               e.printStackTrace();
           }
   
           return "Builder works";
       }
   
       @PostMapping("/plan/create")
       public String planFromMessage(
               @RequestParam("message") String message
       ){
   
           WayangPlanBuilder wpb = new WayangPlanBuilder(message);
   
           /*TODO ADD id to executions*/
           wpb.getWayangContext().execute(wpb.getWayangPlan());
   
           return "";
       }
   
       @GetMapping("/")
       public String all(){
           System.out.println("detected!");
   
           try {
               FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
               WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);
   
               WayangContext wc = buildContext(plan);
               WayangPlan wp = buildPlan(plan);
   
               System.out.println("Plan!");
               System.out.println(wp.toString());
   
               wc.execute(wp);
               return("Works!");
   
           } catch (FileNotFoundException e) {
               e.printStackTrace();
           } catch (IOException e) {
               e.printStackTrace();
           }
   
           return "Not working";
       }
   
       private WayangContext buildContext(WayangPlanProto plan){
   
           WayangContext ctx = new WayangContext();
   //        plan.getContext().getPlatformsList().forEach(platform -> {
   //            if (platform.getNumber() == 0)
   //                ctx.with(Java.basicPlugin());
   //            else if (platform.getNumber() == 1)
   //                ctx.with(Spark.basicPlugin());
   //        });
           ctx.with(Spark.basicPlugin());
   
           return ctx;
       }
   
       private WayangPlan buildPlan(WayangPlanProto plan){
   
           System.out.println(plan);
   
           PlanProto planProto = plan.getPlan();
           LinkedList<OperatorProto> protoList = new LinkedList<>();
           planProto.getSourcesList().forEach(protoList::addLast);
   
           Map<String, OperatorBase> operators = new HashMap<>();
           List<OperatorBase> sinks = new ArrayList<>();
           while(! protoList.isEmpty()) {
   
               OperatorProto proto = protoList.pollFirst();
   
               /* Checking if protoOperator can be connected to the current WayangPlan*/
               boolean processIt;
               if(proto.getType().equals("source")) processIt = true;
   
               else {
                   /* Checking if ALL predecessors were already processed */
                   processIt = true;
                   for(String predecessor : proto.getPredecessorsList()){
                       if (!operators.containsKey(predecessor)) {
                           processIt = false;
                           break;
                       }
                   }
               }
   
               /* Operators should not be processed twice*/
               if(operators.containsKey(proto.getId())) processIt = false;
   
               if(processIt) {
   
                   /* Create and store Wayang operator */
                   OperatorBase operator = createOperatorByType(proto);
                   operators.put(proto.getId(), operator);
   
                   /*TODO Connect with predecessors requires more details in connection slot*/
                   int order = 0;
                   for (String pre_id : proto.getPredecessorsList()) {
   
                       OperatorBase predecessor = operators.get(pre_id);
                       /* Only works without replicate topology */
                       predecessor.connectTo(0, operator, order);
                       order++;
   
                       if(proto.getType().equals("sink")){
                           sinks.add(operator);
                           //if(!sinks.contains(operator)) {
                           //    sinks.add(operator);
                           //}
                       }
                   }
   
                   /*List of OperatorProto successors
                    * They will be added to the protoList
                    * nevertheless they must be processed only if the parents are in operators list */
                   List<OperatorProto> listSuccessors = planProto.getOperatorsList()
                           .stream()
                           .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                           .collect(Collectors.toList());
                   for (OperatorProto successor : listSuccessors){
                       if(!protoList.contains(successor)){
                           protoList.addLast(successor);
                       }
                   }
   
                   List<OperatorProto> sinkSuccessors = planProto.getSinksList()
                           .stream()
                           .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                           .collect(Collectors.toList());
                   for (OperatorProto successor : sinkSuccessors){
                       if(!protoList.contains(successor)){
                           protoList.addLast(successor);
                       }
                   }
   
               } else {
   
                   /* In case we cannot process it yet, It must be added again at the end*/
                   protoList.addLast(proto);
               }
           }
   
           WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
           return wayangPlan;
       }
   
       public OperatorBase createOperatorByType(OperatorProto operator){
   
           System.out.println("Typo: " + operator.getType());
           switch(operator.getType()){
               case "source":
                   try {
                       String source_path = operator.getPath();
                       URL url = new File(source_path).toURI().toURL();
                       return new TextFileSource(url.toString());
                   } catch (MalformedURLException e) {
                       e.printStackTrace();
                   }
                   break;
               case "sink":
                   try {
                       String sink_path = operator.getPath();
                       URL url = new File(sink_path).toURI().toURL();
                       return new TextFileSink<String>(
                               url.toString(),
                               String.class
                       );
   
                   } catch (MalformedURLException e) {
                       e.printStackTrace();
                   }
                   break;
               case "reduce_by_key":
                   try {
                       /* Function to be applied in Python workers */
                       ByteString function = operator.getUdf();
   
                       /* Has dimension or positions that compose GroupKey */
                       Map<String, String> parameters = operator.getParametersMap();
   
                       PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
                           operator.getParametersMap(),
                           operator.getUdf() ,
                           String.class,
                           String.class,
                               false
                       );
   
                       String sink_path = operator.getPath();
                       URL url = new File(sink_path).toURI().toURL();
                       return new TextFileSink<String>(
                               url.toString(),
                               String.class
                       );
   
                   } catch (MalformedURLException e) {
                       e.printStackTrace();
                   }
                   break;
               case "map_partition":
                   return new MapPartitionsOperator<>(
                       new MapPartitionsDescriptor<String, String>(
                           new WrappedPythonFunction<String, String>(
                               l -> l,
                               operator.getUdf()
                           ),
                           String.class,
                           String.class
                       )
                   );
   
               case "union":
                   return new UnionAllOperator<String>(
                           String.class
                   );
   
           }
   
           throw new WayangException("Operator Type not supported");
       }
   
       public static URI createUri(String resourcePath) {
           try {
               return Thread.currentThread().getClass().getResource(resourcePath).toURI();
           } catch (URISyntaxException e) {
               throw new IllegalArgumentException("Illegal URI.", e);
           }
   
       }
   
   }
   
   ```
   
   fc40b51c9c64a2be4b9ade89923dcd88f2da6ce4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@wayang.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-wayang] glauesppen closed issue #251: ADD id to executions

Posted by "glauesppen (via GitHub)" <gi...@apache.org>.
glauesppen closed issue #251: ADD id to executions
URL: https://github.com/apache/incubator-wayang/issues/251


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@wayang.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org