You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/20 09:04:01 UTC

[GitHub] [flink] syhily commented on a diff in pull request #19974: [FLINK-28083][Connector/Pulsar] Object-reusing for Pulsar source

syhily commented on code in PR #19974:
URL: https://github.com/apache/flink/pull/19974#discussion_r901427307


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/emitter/PulsarRecordEmitter.java:
##########
@@ -20,26 +20,69 @@
 
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
 import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
+import org.apache.flink.util.Collector;
+
+import org.apache.pulsar.client.api.Message;
 
 /**
  * The {@link RecordEmitter} implementation for both {@link PulsarOrderedSourceReader} and {@link
  * PulsarUnorderedSourceReader}. We would always update the last consumed message id in this
  * emitter.
  */
 public class PulsarRecordEmitter<T>
-        implements RecordEmitter<PulsarMessage<T>, T, PulsarPartitionSplitState> {
+        implements RecordEmitter<Message<byte[]>, T, PulsarPartitionSplitState> {
+
+    private final PulsarDeserializationSchema<T> deserializationSchema;
+    private final SourceOutputWrapper<T> sourceOutputWrapper = new SourceOutputWrapper<>();
+
+    public PulsarRecordEmitter(PulsarDeserializationSchema<T> deserializationSchema) {
+        this.deserializationSchema = deserializationSchema;
+    }
 
     @Override
     public void emitRecord(
-            PulsarMessage<T> element, SourceOutput<T> output, PulsarPartitionSplitState splitState)
+            Message<byte[]> element, SourceOutput<T> output, PulsarPartitionSplitState splitState)
             throws Exception {
-        // Sink the record to source output.
-        output.collect(element.getValue(), element.getEventTime());
-        // Update the split state.
-        splitState.setLatestConsumedId(element.getId());
+        // Update the source output.
+        sourceOutputWrapper.setSourceOutput(output);
+        sourceOutputWrapper.setTimestamp(element);
+
+        deserializationSchema.deserialize(element, sourceOutputWrapper);
+        splitState.setLatestConsumedId(element.getMessageId());
+    }
+
+    private static class SourceOutputWrapper<T> implements Collector<T> {
+        private SourceOutput<T> sourceOutput;
+        private long timestamp;
+
+        @Override
+        public void collect(T record) {
+            if (timestamp > 0) {
+                sourceOutput.collect(record, timestamp);
+            } else {
+                sourceOutput.collect(record);
+            }
+        }
+
+        @Override
+        public void close() {
+            // Nothing to do here.
+        }
+
+        private void setSourceOutput(SourceOutput<T> sourceOutput) {
+            this.sourceOutput = sourceOutput;
+        }
+
+        /**
+         * Get the event timestamp from Pulsar. Zero means there is no event time. See {@link
+         * Message#getEventTime()} to get the reason why it returns zero.
+         */
+        private void setTimestamp(Message<?> message) {
+            this.timestamp = message.getEventTime();

Review Comment:
   Just to avoid the same code as Kafka. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org