You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/12 09:10:46 UTC

[GitHub] [pulsar] lichuan6 opened a new issue #9186: record.getPublishTime return empty

lichuan6 opened a new issue #9186:
URL: https://github.com/apache/pulsar/issues/9186


   Hi, 
   
   I'm writing a sink connector using `io.core.Sink`. However, when I tried to call `record.getMessage()` and checked if message was present using `msg.isPresent()`, I got a `false` value. Why I got a record with a mesage whose publishtime and eventtime were both empty?
   
   ```java
   @Override
       public void write(Record<byte[]> record) {
           Optional<Message<byte[]>> msg = record.getMessage();
           Optional<Long> eventTime = record.getEventTime();
           String t = "";
           if (eventTime.isPresent()) {
               t = eventTime.toString();
           } else {
               t = "Empty";
           }
           if (msg.isPresent()) {
               System.out.println(">>>> PublishTime : " + msg.get().getPublishTime() + ", Eventtime : " + t);
           } else {
               System.out.println(">>>> eventtime " + t + " Record(empty) : "+record.toString() );
           }
       }
   ``` 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] freeznet commented on issue #9186: record.getPublishTime return empty

Posted by GitBox <gi...@apache.org>.
freeznet commented on issue #9186:
URL: https://github.com/apache/pulsar/issues/9186#issuecomment-759516021


   I will do some tests to see if I can reproduce the problem. Is this problem happens both standalone and k8s clusters?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lichuan6 edited a comment on issue #9186: record.getPublishTime return empty

Posted by GitBox <gi...@apache.org>.
lichuan6 edited a comment on issue #9186:
URL: https://github.com/apache/pulsar/issues/9186#issuecomment-759337140


   I wrote an application and tried to consume message using go client, and I can retreive the publishtime from `msg.PublishTime()`.
   
   ```go
   package main
   
   import (
   	"context"
   	"fmt"
   	"log"
   	"time"
   
   	"github.com/apache/pulsar-client-go/pulsar"
   )
   
   var (
   	token = "ey<...>"
   )
   
   func main() {
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL:               "pulsar://pulsar-broker.pulsar:6650",
   		OperationTimeout:  30 * time.Second,
   		ConnectionTimeout: 30 * time.Second,
   		Authentication:    pulsar.NewAuthenticationToken(token),
   	})
   	if err != nil {
   		log.Fatalf("Could not instantiate Pulsar client: %v", err)
   	}
   
   	defer client.Close()
   
   	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
   		Topic:            "public/default/pulsar",
   		SubscriptionName: "my-pulsar-sub-test",
   		Type:             pulsar.Shared,
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   	defer consumer.Close()
   
   	for {
   		for i := 0; i < 10; i++ {
   			msg, err := consumer.Receive(context.Background())
   			if err != nil {
   				log.Fatal(err)
   			}
   
   			fmt.Printf("Received msgId: %#v, PublishTime : %v, -- content: '%s'\n",
   				msg.ID(),
   				msg.PublishTime(),
   				string(msg.Payload()))
   
   			consumer.Ack(msg)
   		}
   		time.Sleep(5 * time.Second)
   		log.Printf("Sleeping 5s ...")
   		fmt.Printf("Sleeping 5s ...")
   	}
   }
   
   ```
   
   output from pod:
   >
   Received msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:31878, entryID:4294, batchIdx:11, partitionIdx:0}, tracker:(*pulsar.ackTracker)(0xc0000976c0), consumer:(*pulsar.partitionConsumer)(0xc000292000), receivedTime:time.Time{wall:0xbff7ce429a0c36bb, ext:10088415541, loc:(*time.Location)(0xf47940)}}, PublishTime : 2021-01-13 17:46:50.43 +0800 CST, -- content: '{"kubernetes.annotations.kubernetes.io/psp":"eks.privileged","kubernetes.annotations.prometheus.io/port":"80","kubernetes.annotations.prometheus.io/scrape":"true","kubernetes.container_hash":"123123123123.dkr.ecr.cn-northwest-1.amazonaws.com.cn/apachepulsar/pulsar-all@sha256:6a181bfd32cd36271034a3e9c325d5ac7625b8d5b4c5fc0154434f176d8389c2","kubernetes.container_image":"123123123123.dkr.ecr.cn-northwest-1.amazonaws.com.cn/apachepulsar/pulsar-all:2.6.1","kubernetes.container_name":"pulsar-proxy","kubernetes.docker_id":"f235aa5d86b7aa8600ec615306e379cc8f5ad66447cd58e1aa807a1766dfc59e","kubernetes.host":"ip-
 172-24-162-12.cn-northwest-1.compute.internal","kubernetes.labels.app":"pulsar","kubernetes.labels.cluster":"2.6.1-pulsar","kubernetes.labels.component":"proxy","kubernetes.labels.controller-revision-hash":"pulsar-proxy-86dcb67f7b","kubernetes.labels.redeploy":"1609826649","kubernetes.labels.release":"2.6.1","kubernetes.labels.statefulset.kubernetes.io/pod-name":"pulsar-proxy-3","kubernetes.namespace_name":"pulsar","kubernetes.pod_id":"4bad6623-d9ae-408c-81c0-5e065868d455","kubernetes.pod_name":"pulsar-proxy-3","log":"09:46:49.647 [pulsar-proxy-io-2-1] INFO  org.apache.pulsar.proxy.server.ProxyConnection - [/172.24.136.239:45046] New connection opened\n","stream":"stdout"}'
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lichuan6 commented on issue #9186: record.getPublishTime return empty

Posted by GitBox <gi...@apache.org>.
lichuan6 commented on issue #9186:
URL: https://github.com/apache/pulsar/issues/9186#issuecomment-758553655


   In a pulsar cluster deployed in kubernetes(above), I got no value of PublishTime, however, in pulsar standalone cluster, I can got `PublishTime` from `msg.get().getPublishTime()` call.
   
   I also notice message's Eventtime maybe empty, any doc I can refer to when event time can be empty?
   
   ```
   batchRecords size: 7, responseCount:: 0, ack : 0, nack : 0, queue size: 0
   >>>> PublishTime : 1610446160465, Eventtime : Optional[11651379494838]
   >>>> PublishTime : 1610446160465, Eventtime : Optional[11651379494838]
   >>>> PublishTime : 1610446160465, Eventtime : Optional[11651379494838]
   >>>> PublishTime : 1610446160465, Eventtime : Optional[11651379494838]
   >>>> PublishTime : 1610446160465, Eventtime : Optional[11651379494838]
   >>>> PublishTime : 1610446160465, Eventtime : Optional[11651379494838]
   >>>> PublishTime : 1610446160465, Eventtime : Optional[11651379494838]
   >>>> PublishTime : 1610446160465, Eventtime : Optional[11651379494838]
   >>>> PublishTime : 1610446160465, Eventtime : Optional[11651379494838]
   >>>> PublishTime : 1610446160465
   >>>> PublishTime : 1610446160465
   >>>> PublishTime : 1610446160465
   >>>> PublishTime : 1610446160465
   >>>> PublishTime : 1610446160465
   >>>> PublishTime : 1610446160465
   batchRecords size: 9, responseCount:: 0, ack : 0, nack : 0, queue size: 0
   >>>> PublishTime : 1610446165464, Eventtime : Optional[11651379494838]
   >>>> PublishTime : 1610446165464, Eventtime : Optional[11651379494838]
   >>>> PublishTime : 1610446165464, Eventtime : Optional[11651379494838]
   ```
   
   Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #9186: record.getPublishTime return empty

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #9186:
URL: https://github.com/apache/pulsar/issues/9186#issuecomment-1058894181


   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lichuan6 edited a comment on issue #9186: record.getPublishTime return empty

Posted by GitBox <gi...@apache.org>.
lichuan6 edited a comment on issue #9186:
URL: https://github.com/apache/pulsar/issues/9186#issuecomment-759337140


   I wrote an application and tried to consume message using go client, and I can retreive the publishtime from `msg.PublishTime()`.
   
   ```go
   package main
   
   import (
   	"context"
   	"fmt"
   	"log"
   	"time"
   
   	"github.com/apache/pulsar-client-go/pulsar"
   )
   
   var (
   	token = "ey<...>"
   )
   
   func main() {
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL:               "pulsar://pulsar-broker.pulsar:6650",
   		OperationTimeout:  30 * time.Second,
   		ConnectionTimeout: 30 * time.Second,
   		Authentication:    pulsar.NewAuthenticationToken(token),
   	})
   	if err != nil {
   		log.Fatalf("Could not instantiate Pulsar client: %v", err)
   	}
   
   	defer client.Close()
   
   	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
   		Topic:            "public/default/pulsar",
   		SubscriptionName: "my-pulsar-sub-test",
   		Type:             pulsar.Shared,
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   	defer consumer.Close()
   
   	for {
   		for i := 0; i < 10; i++ {
   			msg, err := consumer.Receive(context.Background())
   			if err != nil {
   				log.Fatal(err)
   			}
   
   			fmt.Printf("Received msgId: %#v, PublishTime : %v, -- content: '%s'\n",
   				msg.ID(),
   				msg.PublishTime(),
   				string(msg.Payload()))
   
   			consumer.Ack(msg)
   		}
   		time.Sleep(5 * time.Second)
   		log.Printf("Sleeping 5s ...")
   		fmt.Printf("Sleeping 5s ...")
   	}
   }
   
   ```
   
   output from pod:
   ```
   Received msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:31878, entryID:4294, batchIdx:11, partitionIdx:0}, tracker:(*pulsar.ackTracker)(0xc0000976c0), consumer:(*pulsar.partitionConsumer)(0xc000292000), receivedTime:time.Time{wall:0xbff7ce429a0c36bb, ext:10088415541, loc:(*time.Location)(0xf47940)}}, PublishTime : 2021-01-13 17:46:50.43 +0800 CST, -- content: '{"kubernetes.annotations.kubernetes.io/psp":"eks.privileged","kubernetes.annotations.prometheus.io/port":"80","kubernetes.annotations.prometheus.io/scrape":"true","kubernetes.container_hash":"123123123123.dkr.ecr.cn-northwest-1.amazonaws.com.cn/apachepulsar/pulsar-all@sha256:6a181bfd32cd36271034a3e9c325d5ac7625b8d5b4c5fc0154434f176d8389c2","kubernetes.container_image":"123123123123.dkr.ecr.cn-northwest-1.amazonaws.com.cn/apachepulsar/pulsar-all:2.6.1","kubernetes.container_name":"pulsar-proxy","kubernetes.docker_id":"f235aa5d86b7aa8600ec615306e379cc8f5ad66447cd58e1aa807a1766dfc59e","kubernetes.host":"ip-
 172-24-162-12.cn-northwest-1.compute.internal","kubernetes.labels.app":"pulsar","kubernetes.labels.cluster":"2.6.1-pulsar","kubernetes.labels.component":"proxy","kubernetes.labels.controller-revision-hash":"pulsar-proxy-86dcb67f7b","kubernetes.labels.redeploy":"1609826649","kubernetes.labels.release":"2.6.1","kubernetes.labels.statefulset.kubernetes.io/pod-name":"pulsar-proxy-3","kubernetes.namespace_name":"pulsar","kubernetes.pod_id":"4bad6623-d9ae-408c-81c0-5e065868d455","kubernetes.pod_name":"pulsar-proxy-3","log":"09:46:49.647 [pulsar-proxy-io-2-1] INFO  org.apache.pulsar.proxy.server.ProxyConnection - [/172.24.136.239:45046] New connection opened\n","stream":"stdout"}'
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Remco2376 commented on issue #9186: record.getPublishTime return empty

Posted by GitBox <gi...@apache.org>.
Remco2376 commented on issue #9186:
URL: https://github.com/apache/pulsar/issues/9186#issuecomment-759499499


   I can confirm that Pulsar sink connectors occasionally receive records that do not contain eventTime and/or message. This behavior seems random. These Java calls fail randomly:
   
   long eventTime = record.getEventTime().orElseThrow(() -> new RuntimeException("EventTime does not exist in record"));
   Message<GenericRecord> message = record.getMessage().orElseThrow(() -> new RuntimeException("Message does not exist in record"));
   
   All records to actually contain an eventTime and message.
   
   This happened with Pulsar 2.6.1. Recently upgraded to 2.7.0 and haven't seen the same issue so far. But I've tested only for a very limited time. This issue was observed in Pulsar/Docker (https://pulsar.apache.org/docs/en/standalone-docker/).
   
   I'm not sure, but I got the impression that this issue could be related to high memory usage (https://github.com/streamnative/pulsar-io-cloud-storage/blob/master/src/main/java/org/apache/pulsar/io/jcloud/util/AvroRecordUtil.java contains a memory leak), so I'm currently logging memory usage to confirm that:
   
   long totalMemory = Runtime.getRuntime().totalMemory();
   long freeMemory = Runtime.getRuntime().freeMemory();
   long memoryUsage = totalMemory - freeMemory;
   
   System.out.println(totalMemory + " " + freeMemory + " " + memoryUsage);


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Remco2376 commented on issue #9186: record.getPublishTime return empty

Posted by GitBox <gi...@apache.org>.
Remco2376 commented on issue #9186:
URL: https://github.com/apache/pulsar/issues/9186#issuecomment-759520797


   It seems to happen both standalone and in clusters. You might have to consume a significant number of records (100k or so) before you detect it. Sometimes the problems is there, sometimes not. But if you detect an incorrect record, you most likely find many others in the same timeframe. That's one of the reasons I assume it could be related to high memory usage.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lichuan6 commented on issue #9186: record.getPublishTime return empty

Posted by GitBox <gi...@apache.org>.
lichuan6 commented on issue #9186:
URL: https://github.com/apache/pulsar/issues/9186#issuecomment-759337140


   I wrote an application and tried to consume message using go client, and I can retreive the publishtime from `msg.PublishTime()`.
   
   ```go
   package main
   
   import (
   	"context"
   	"fmt"
   	"log"
   	"time"
   
   	"github.com/apache/pulsar-client-go/pulsar"
   )
   
   var (
   	token = "ey<...>"
   )
   
   func main() {
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL:               "pulsar://pulsar-broker.pulsar:6650",
   		OperationTimeout:  30 * time.Second,
   		ConnectionTimeout: 30 * time.Second,
   		Authentication:    pulsar.NewAuthenticationToken(token),
   	})
   	if err != nil {
   		log.Fatalf("Could not instantiate Pulsar client: %v", err)
   	}
   
   	defer client.Close()
   
   	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
   		Topic:            "public/default/pulsar",
   		SubscriptionName: "my-pulsar-sub-test",
   		Type:             pulsar.Shared,
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   	defer consumer.Close()
   
   	for {
   		for i := 0; i < 10; i++ {
   			msg, err := consumer.Receive(context.Background())
   			if err != nil {
   				log.Fatal(err)
   			}
   
   			fmt.Printf("Received msgId: %#v, PublishTime : %v, -- content: '%s'\n",
   				msg.ID(),
   				msg.PublishTime(),
   				string(msg.Payload()))
   
   			consumer.Ack(msg)
   		}
   		time.Sleep(5 * time.Second)
   		log.Printf("Sleeping 5s ...")
   		fmt.Printf("Sleeping 5s ...")
   	}
   }
   
   ```
   
   output from pod:
   
   Received msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:31878, entryID:4294, batchIdx:11, partitionIdx:0}, tracker:(*pulsar.ackTracker)(0xc0000976c0), consumer:(*pulsar.partitionConsumer)(0xc000292000), receivedTime:time.Time{wall:0xbff7ce429a0c36bb, ext:10088415541, loc:(*time.Location)(0xf47940)}}, PublishTime : 2021-01-13 17:46:50.43 +0800 CST, -- content: '{"kubernetes.annotations.kubernetes.io/psp":"eks.privileged","kubernetes.annotations.prometheus.io/port":"80","kubernetes.annotations.prometheus.io/scrape":"true","kubernetes.container_hash":"123123123123.dkr.ecr.cn-northwest-1.amazonaws.com.cn/apachepulsar/pulsar-all@sha256:6a181bfd32cd36271034a3e9c325d5ac7625b8d5b4c5fc0154434f176d8389c2","kubernetes.container_image":"123123123123.dkr.ecr.cn-northwest-1.amazonaws.com.cn/apachepulsar/pulsar-all:2.6.1","kubernetes.container_name":"pulsar-proxy","kubernetes.docker_id":"f235aa5d86b7aa8600ec615306e379cc8f5ad66447cd58e1aa807a1766dfc59e","kubernetes.host":"ip-
 172-24-162-12.cn-northwest-1.compute.internal","kubernetes.labels.app":"pulsar","kubernetes.labels.cluster":"2.6.1-pulsar","kubernetes.labels.component":"proxy","kubernetes.labels.controller-revision-hash":"pulsar-proxy-86dcb67f7b","kubernetes.labels.redeploy":"1609826649","kubernetes.labels.release":"2.6.1","kubernetes.labels.statefulset.kubernetes.io/pod-name":"pulsar-proxy-3","kubernetes.namespace_name":"pulsar","kubernetes.pod_id":"4bad6623-d9ae-408c-81c0-5e065868d455","kubernetes.pod_name":"pulsar-proxy-3","log":"09:46:49.647 [pulsar-proxy-io-2-1] INFO  org.apache.pulsar.proxy.server.ProxyConnection - [/172.24.136.239:45046] New connection opened\n","stream":"stdout"}'
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Remco2376 edited a comment on issue #9186: record.getPublishTime return empty

Posted by GitBox <gi...@apache.org>.
Remco2376 edited a comment on issue #9186:
URL: https://github.com/apache/pulsar/issues/9186#issuecomment-759499499


   I can confirm that Pulsar sink connectors occasionally receive records that do not contain eventTime and/or message. This behavior seems random. These Java calls fail randomly:
   
   long eventTime = record.getEventTime().orElseThrow(() -> new RuntimeException("EventTime does not exist in record"));
   Message<GenericRecord> message = record.getMessage().orElseThrow(() -> new RuntimeException("Message does not exist in record"));
   
   All records do actually contain an eventTime and message.
   
   This happened with Pulsar 2.6.1. Recently upgraded to 2.7.0 and haven't seen the same issue so far. But I've tested only for a very limited time. This issue was observed in Pulsar/Docker (https://pulsar.apache.org/docs/en/standalone-docker/).
   
   I'm not sure, but I got the impression that this issue could be related to high memory usage (https://github.com/streamnative/pulsar-io-cloud-storage/blob/master/src/main/java/org/apache/pulsar/io/jcloud/util/AvroRecordUtil.java contains a memory leak), so I'm currently logging memory usage to confirm that:
   
   long totalMemory = Runtime.getRuntime().totalMemory();
   long freeMemory = Runtime.getRuntime().freeMemory();
   long memoryUsage = totalMemory - freeMemory;
   
   System.out.println(totalMemory + " " + freeMemory + " " + memoryUsage);


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org