You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:25:46 UTC

[GitHub] [pulsar] kk-0619 opened a new issue #7036: State storage not working after running the functions for a while.

kk-0619 opened a new issue #7036:
URL: https://github.com/apache/pulsar/issues/7036


   **Describe the bug**
   State storage function not working after sending data into state storage for about an hour. 
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Run the **ProducerTest.java**
   `package pulsar;
   
   import java.text.SimpleDateFormat;
   import java.util.Date;
   import java.util.Random;
   import org.apache.pulsar.client.impl.schema.JSONSchema;
   import org.json.simple.JSONObject;
   import org.json.simple.parser.JSONParser;
   import pulsar.core.PulsarProducer;
   import pulsar.models.Occupancy_Offset;
   
   public class ProducerTest {
   
       public static void main(String[] args) throws Exception {
           PulsarProducer<Occupancy_Offset> pulsarProducer = new PulsarProducer("pulsar://192.168.2.73:6650", "statetest")
                   .init(JSONSchema.of(Occupancy_Offset.class));
   
           Occupancy_Offset ocoff = new Occupancy_Offset();
           Random rand = new Random();
           while (true) {
               ocoff.dateTime = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").format(new Date());
               ocoff.occupancy = rand.nextInt(9) + 1;
               ocoff.offset = 1;//rand.nextInt(9) + 1;
               System.out.println(ocoff);
               pulsarProducer.send(ocoff);
           }
       }
   
   }`
   
   2. State function
   `package pulsar;
   
   import java.io.PrintWriter;
   import java.io.StringWriter;
   import java.nio.ByteBuffer;
   import static java.nio.charset.StandardCharsets.UTF_8;
   
   import java.util.Calendar;
   import java.util.Collections;
   import java.util.HashMap;
   import javax.management.loading.MLet;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.MessageId;
   import org.apache.pulsar.client.api.Schema;
   import org.apache.pulsar.client.impl.schema.JSONSchema;
   import org.apache.pulsar.common.functions.FunctionConfig;
   import org.apache.pulsar.functions.LocalRunner;
   import org.json.simple.JSONObject;
   import org.json.simple.parser.JSONParser;
   
   import org.apache.pulsar.functions.api.Context;
   import org.apache.pulsar.functions.api.Function;
   import org.json.simple.JSONArray;
   import pulsar.models.EventRawData;
   import pulsar.models.Occupancy_Offset;
   
   public class StateTestFunction implements Function<Occupancy_Offset, Void> {
   
       @Override
       public Void process(Occupancy_Offset occdata, Context context) throws Exception {
           System.out.println("\n\n\n-----------------STATETEST-----------------\n\n\n");
           try {
              
               int occupancydata = 0;
               int offsetdata = 0;
               String laststatetest = "testlaststate";
               JSONParser parser = new JSONParser();
               try {
                   ByteBuffer lastOccupancyState = context.getState(laststatetest);
                   JSONObject jo = new JSONObject();
                   if (lastOccupancyState != null) {
                       jo = (JSONObject) parser.parse(new String(lastOccupancyState.array(), "UTF-8"));
                       System.out.println("jo 1 : " + jo.toJSONString());
                       // get old occupancy
                       occupancydata = Integer.valueOf(jo.get("occupancy").toString());
                       System.out.println("occupancydata : " + occupancydata);
                       offsetdata = Integer.valueOf(jo.get("offset").toString());
                       System.out.println("offsetdata : " + offsetdata);
                   } else{
                       System.out.println("[" + laststatetest + "] NOT SET YET");
                   }
                   
                   jo.put("occupancy", occupancydata + occdata.occupancy);
                   jo.put("offset", offsetdata + occdata.offset);
                   System.out.println("jo 2 : " + jo.toJSONString());
                   context.putState(laststatetest, ByteBuffer.wrap(jo.toString().getBytes(UTF_8)));
               } catch (Exception e) {
                   System.out.println("test Exception: " + e.getMessage());
                   e.printStackTrace();
               }
   
               System.out.println("NEW Occupancy[" + laststatetest + "] : " + occupancydata + ", " + offsetdata);
   
               Occupancy_Offset total = new Occupancy_Offset();
               total.dateTime = occdata.dateTime;
               total.occupancy = occupancydata;
               total.offset = offsetdata;
               context.newOutputMessage(laststatetest, JSONSchema.of(Occupancy_Offset.class))
                       .value(total)
                       .send();
   
           } catch (NullPointerException npe) {
               System.out.println("NullPointerException error: " + npe.getMessage());
           } catch (Exception e) {
               System.out.println("Exception error: " + e.getMessage());
           }
           System.out.println("-------------OUT-------------\n");
           return null;
       }
   }`
   
   3. After running for minutes to an hour.
   4. Data stop uploading to presto.
   
   **Models**
   `import java.io.Serializable;
   
   public class EventRawData implements Serializable{
       public String dateTime;
       public int in;
       public int out;
   }`
   
   `package pulsar.models;
   
   import java.io.Serializable;
   
   public class Occupancy_Offset implements Serializable {
   
       public String dateTime;
       public int occupancy;
       public int offset;
   
       public Occupancy_Offset() {
       }
   
       @Override
       public String toString() {
           return dateTime + "|" + occupancy + "|" + offset;
       }
   }`
   
   **Expected behavior**
   Start to send data to the state storage and upload to presto. Data is able to upload to presto at first. After some time, it stops and no more data being uploaded to presto.
   
   **Screenshots**
   1. Output after function stopped working
   ![image](https://user-images.githubusercontent.com/65915456/82835364-63b26300-9ef6-11ea-8886-8c103d1b5165.png)
   
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org