You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@streampipes.apache.org by "luoluoyuyu (via GitHub)" <gi...@apache.org> on 2023/05/27 13:42:09 UTC

[D] About Kafka consumer data loss problem (streampipes)

GitHub user luoluoyuyu created a discussion: About Kafka consumer data loss problem

### Problem Description
SpKafkaConsumer's process of creating consumers is an asynchronous process. If the connected topic has not submitted data, the data produced by the producer to the topic will be lost in the process of the consumer connecting to the topic. (The situation that creates the problem can be found in https://github.com/apache/streampipes/blob/dev/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java#LL28C1-L28C1)
```
public class AdaptersTest {
  public void testAdapter(AdapterTesterBase adapterTester) throws Exception {
    adapterTester.startAdapterService();
    AdapterDescription adapterDescription = adapterTester.prepareAdapter();
    adapterTester.startAdapter(adapterDescription);
    List<Map<String, Object>> data = adapterTester.generateData();
    adapterTester.validateData(data);
  }
}
```
### Reason
In AdaptersTest, for example, consumers and producers are created asynchronously. When the created topic has not been submitted by a consumer for the same consumer group, the default behavior of the consumer in this case is to consume only the newly generated data after the consumer connects to the topic. When the producer produces a message during the consumer's connection to the topic, the consumer also loses the corresponding data.

### Solutions
Add the corresponding configuration items, for the same consumer group, if no offset has been submitted, then start consuming from the beginning

```
public class ConsumerConfigFactory extends AbstractConfigFactory {
    private static final String AUTO_OFFSET_RESET_CONFIG_DEFAULT="earliest";
    
    @Override
  public Properties makeDefaultProperties() {
    Properties props = new Properties();
    //.....
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,AUTO_OFFSET_RESET_CONFIG_DEFAULT);
    return props;
  }
}
```





GitHub link: https://github.com/apache/streampipes/discussions/1626

----
This is an automatically sent email for dev@streampipes.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@streampipes.apache.org


Re: [D] About Kafka consumer data loss problem (streampipes)

Posted by "luoluoyuyu (via GitHub)" <gi...@apache.org>.
GitHub user luoluoyuyu added a comment to the discussion: About Kafka consumer data loss problem

Hi, this issue is the one I encountered when adding kafka tests in the streampipes-integration-tests module, so I want to add AUTO_OFFSET_RESET_CONFIG configuration to the consumer default configuration item of kafka

GitHub link: https://github.com/apache/streampipes/discussions/1626#discussioncomment-6022770

----
This is an automatically sent email for dev@streampipes.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@streampipes.apache.org


Re: [D] About Kafka consumer data loss problem (streampipes)

Posted by "luoluoyuyu (via GitHub)" <gi...@apache.org>.
GitHub user luoluoyuyu closed a discussion: About Kafka consumer data loss problem

### Problem Description
SpKafkaConsumer's process of creating consumers is an asynchronous process. If the connected topic has not submitted data, the data produced by the producer to the topic will be lost in the process of the consumer connecting to the topic. (The situation that creates the problem can be found in https://github.com/apache/streampipes/blob/dev/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java#LL28C1-L28C1)
```
public class AdaptersTest {
  public void testAdapter(AdapterTesterBase adapterTester) throws Exception {
    adapterTester.startAdapterService();
    AdapterDescription adapterDescription = adapterTester.prepareAdapter();
    adapterTester.startAdapter(adapterDescription);
    List<Map<String, Object>> data = adapterTester.generateData();
    adapterTester.validateData(data);
  }
}
```
### Reason
In AdaptersTest, for example, consumers and producers are created asynchronously. When the created topic has not been submitted by a consumer for the same consumer group, the default behavior of the consumer in this case is to consume only the newly generated data after the consumer connects to the topic. When the producer produces a message during the consumer's connection to the topic, the consumer also loses the corresponding data.

### Solutions
Add the corresponding configuration items, for the same consumer group, if no offset has been submitted, then start consuming from the beginning

```
public class ConsumerConfigFactory extends AbstractConfigFactory {
    private static final String AUTO_OFFSET_RESET_CONFIG_DEFAULT="earliest";
    
    @Override
  public Properties makeDefaultProperties() {
    Properties props = new Properties();
    //.....
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,AUTO_OFFSET_RESET_CONFIG_DEFAULT);
    return props;
  }
}
```





GitHub link: https://github.com/apache/streampipes/discussions/1626

----
This is an automatically sent email for dev@streampipes.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@streampipes.apache.org


Re: [D] About Kafka consumer data loss problem (streampipes)

Posted by "bossenti (via GitHub)" <gi...@apache.org>.
GitHub user bossenti added a comment to the discussion: About Kafka consumer data loss problem

Hi @luoluoyuyu 

thanks a lot for reporting 🙏
Since you already shared a possible solution: would you be open to contribute here? 
We would of course help you to get the contribution ready if needed.

GitHub link: https://github.com/apache/streampipes/discussions/1626#discussioncomment-6023918

----
This is an automatically sent email for dev@streampipes.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@streampipes.apache.org


Re: [D] About Kafka consumer data loss problem (streampipes)

Posted by "luoluoyuyu (via GitHub)" <gi...@apache.org>.
GitHub user luoluoyuyu added a comment to the discussion: About Kafka consumer data loss problem

Hi @bossenti  

I have created a PR regarding this discussion : #1629 

GitHub link: https://github.com/apache/streampipes/discussions/1626#discussioncomment-6025895

----
This is an automatically sent email for dev@streampipes.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@streampipes.apache.org


Re: [D] About Kafka consumer data loss problem (streampipes)

Posted by "luoluoyuyu (via GitHub)" <gi...@apache.org>.
GitHub user luoluoyuyu reopened a discussion: About Kafka consumer data loss problem

### Problem Description
SpKafkaConsumer's process of creating consumers is an asynchronous process. If the connected topic has not submitted data, the data produced by the producer to the topic will be lost in the process of the consumer connecting to the topic. (The situation that creates the problem can be found in https://github.com/apache/streampipes/blob/dev/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java#LL28C1-L28C1)
```
public class AdaptersTest {
  public void testAdapter(AdapterTesterBase adapterTester) throws Exception {
    adapterTester.startAdapterService();
    AdapterDescription adapterDescription = adapterTester.prepareAdapter();
    adapterTester.startAdapter(adapterDescription);
    List<Map<String, Object>> data = adapterTester.generateData();
    adapterTester.validateData(data);
  }
}
```
### Reason
In AdaptersTest, for example, consumers and producers are created asynchronously. When the created topic has not been submitted by a consumer for the same consumer group, the default behavior of the consumer in this case is to consume only the newly generated data after the consumer connects to the topic. When the producer produces a message during the consumer's connection to the topic, the consumer also loses the corresponding data.

### Solutions
Add the corresponding configuration items, for the same consumer group, if no offset has been submitted, then start consuming from the beginning

```
public class ConsumerConfigFactory extends AbstractConfigFactory {
    private static final String AUTO_OFFSET_RESET_CONFIG_DEFAULT="earliest";
    
    @Override
  public Properties makeDefaultProperties() {
    Properties props = new Properties();
    //.....
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,AUTO_OFFSET_RESET_CONFIG_DEFAULT);
    return props;
  }
}
```





GitHub link: https://github.com/apache/streampipes/discussions/1626

----
This is an automatically sent email for dev@streampipes.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@streampipes.apache.org