You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by GitBox <gi...@apache.org> on 2022/06/19 22:23:29 UTC

[GitHub] [incubator-wayang] github-actions[bot] opened a new issue, #241: add to a config file

github-actions[bot] opened a new issue, #241:
URL: https://github.com/apache/incubator-wayang/issues/241

   add to a config file
   
   https://github.com/apache/incubator-wayang/blob/d859a97d43a8c3c3c964150eaff8f3833e41ea75/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java#L41
   
   ```java
   
   /*
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
   
   package org.apache.wayang.api.python.executor;
   
   import org.apache.wayang.api.python.function.PythonCode;
   import org.apache.wayang.api.python.function.PythonUDF;
   import org.apache.wayang.core.api.exception.WayangException;
   
   import java.io.BufferedOutputStream;
   import java.io.DataOutputStream;
   import java.io.IOException;
   import java.net.Socket;
   import java.net.SocketException;
   import java.nio.charset.StandardCharsets;
   import java.util.Iterator;
   import java.util.Map;
   
   public class ProcessFeeder<Input, Output> {
   
       private Socket socket;
       private PythonUDF<Input, Output> udf;
       private PythonCode serializedUDF;
       private Iterable<Input> input;
   
       //TODO add to a config file
       int END_OF_DATA_SECTION = -1;
       int NULL = -5;
   
       public ProcessFeeder(
               Socket socket,
               PythonUDF<Input, Output> udf,
               PythonCode serializedUDF,
               Iterable<Input> input){
   
           if(input == null) throw new WayangException("Nothing to process with Python API");
   
           this.socket = socket;
           this.udf = udf;
           this.serializedUDF = serializedUDF;
           this.input = input;
   
       }
   
       public void send(){
   
           try{
               //TODO use config buffer size
               int BUFFER_SIZE = 8 * 1024;
   
               BufferedOutputStream stream = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE);
               DataOutputStream dataOut = new DataOutputStream(stream);
   
               writeUDF(serializedUDF, dataOut);
               this.writeIteratorToStream(input.iterator(), dataOut);
               dataOut.writeInt(END_OF_DATA_SECTION);
               dataOut.flush();
   
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   
       public void writeUDF(PythonCode serializedUDF, DataOutputStream dataOut){
   
           //write(serializedUDF.toByteArray(), dataOut);
           writeBytes(serializedUDF.toByteArray(), dataOut);
           System.out.println("UDF written");
   
       }
   
       public void writeIteratorToStream(Iterator<Input> iter, DataOutputStream dataOut)
           throws IOException {
   
           System.out.println("iterator being send");
           int buffer = 0;
           for (Iterator<Input> it = iter; it.hasNext(); ) {
               Input elem = it.next();
               //System.out.println(elem.toString());
               write(elem, dataOut);
           }
       }
   
       /*TODO Missing case PortableDataStream */
       public void write(Object obj, DataOutputStream dataOut){
           try {
   
               if(obj == null)
                   dataOut.writeInt(this.NULL);
   
               /**
                * Byte Array cases
                */
               else if (obj instanceof Byte[] || obj instanceof byte[]) {
                   System.out.println("Writing Bytes");
                   writeBytes(obj, dataOut);
               }
               /**
                * String case
                * */
               else if (obj instanceof String)
                   writeUTF((String) obj, dataOut);
   
               /**
                * Key, Value case
                * */
               else if (obj instanceof Map.Entry)
                   writeKeyValue((Map.Entry) obj, dataOut);
   
               else{
                   throw new WayangException("Unexpected element type " + obj.getClass());
               }
   
   
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   
       public void writeBytes(Object obj, DataOutputStream dataOut){
   
           try{
   
               if (obj instanceof Byte[]) {
   
                   int length = ((Byte[]) obj).length;
   
                   byte[] bytes = new byte[length];
                   int j=0;
   
                   // Unboxing Byte values. (Byte[] to byte[])
                   for(Byte b: ((Byte[]) obj))
                       bytes[j++] = b.byteValue();
   
                   dataOut.writeInt(length);
                   dataOut.write(bytes);
   
               } else if (obj instanceof byte[]) {
   
                   dataOut.writeInt(((byte[]) obj).length);
                   dataOut.write(((byte[]) obj));
               }
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   
       public void writeUTF(String str, DataOutputStream dataOut){
   
           byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
   
           try {
   
               dataOut.writeInt(bytes.length);
               dataOut.write(bytes);
           } catch (SocketException e){
   
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   
       public void writeKeyValue(Map.Entry obj, DataOutputStream dataOut){
   
           write(obj.getKey(), dataOut);
           write(obj.getValue(), dataOut);
       }
   
   }
   
   ```
   
   cac3777d06d10f33de59ddd1d466c1443654a845


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@wayang.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org