You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/10/24 00:32:52 UTC

[GitHub] [pulsar] tuteng edited a comment on issue #5454: mysql JDBC Sink - consumer error

tuteng edited a comment on issue #5454: mysql JDBC Sink - consumer error
URL: https://github.com/apache/pulsar/issues/5454#issuecomment-545688370
 
 
   Please show your producer's code.
   
   The following is an example, please refer to
   
   ```
   @Builder
   @Data
   @NoArgsConstructor
   @AllArgsConstructor
   public class Foo3 {
       public String field1;
       public String field2;
       public int field3;
   }
   
          PulsarClient client = PulsarClient.builder()
                   .serviceUrl("pulsar://localhost:6650")
                   .build();
   
   
           AvroSchema<Foo3> schema = AvroSchema.of(SchemaDefinition.<Foo3>builder().withPojo(Foo3.class).withAlwaysAllowNull(false).build());
           Producer<Foo3> producer = client.newProducer(schema)
                   .topic("test-jdbc")
                   .create();
           for (int i = 0; i < 20; i++) {
               String key = "key-" + i;
   
               Foo3 obj = new Foo3();
               obj.setField1("field1_insert_" + i);
               obj.setField2("field2_insert_" + i);
               obj.setField3(i);
               Map<String, String> properties = Maps.newHashMap();
               properties.put("EVENT", "INSERT");
   
               producer.newMessage()
                       .properties(properties)
                       .key(key)
                       .value(obj)
                       .send();
           }
   
           for (int i = 0; i < 20; i++) {
               String key = "key-" + i;
   
               Foo3 obj = new Foo3();
               obj.setField1("field1_insert_" + i);
               obj.setField2("field2_update_" + i);
               obj.setField3(i);
               Map<String, String> properties = Maps.newHashMap();
               properties.put("EVENT", "UPDATE");
   
               producer.newMessage()
                       .properties(properties)
                       .key(key)
                       .value(obj)
                       .send();
           }
   
           for (int i = 0; i < 20; i++) {
               String key = "key-" + i;
   
               Foo3 obj = new Foo3();
               obj.setField1("field1_insert_" + i);
               obj.setField2("field2_delete_" + i);
               obj.setField3(i);
               Map<String, String> properties = Maps.newHashMap();
               properties.put("EVENT", "DELETE");
   
               producer.newMessage()
                       .properties(properties)
                       .key(key)
                       .value(obj)
                       .send();
           }
           producer.close();
           client.close();
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services