You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/06/08 02:34:18 UTC

[GitHub] [incubator-pinot] npawar commented on a change in pull request #7026: Add Apache Pulsar low level and high level connector

npawar commented on a change in pull request #7026:
URL: https://github.com/apache/incubator-pinot/pull/7026#discussion_r647049045



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.Set;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+
+
+public class PulsarConsumerFactory extends StreamConsumerFactory {
+  @Override
+  public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {

Review comment:
       it should implement `createPartitionGroupConsumer`, and not this method. After the interface refactoring for kinesis, we made PartitionGroup concept mainstream, and would eventually be deprecating the `createPartitionLevelConsumer`. This call is not used anywhere anymore

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/MessageIdStreamOffset.java
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pulsar.client.api.MessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MessageIdStreamOffset implements StreamPartitionMsgOffset {

Review comment:
       please add javadocs for all new impl classes

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/MessageIdStreamOffset.java
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pulsar.client.api.MessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MessageIdStreamOffset implements StreamPartitionMsgOffset {
+  private Logger LOGGER = LoggerFactory.getLogger(MessageIdStreamOffset.class);
+  private MessageId _messageId;
+
+  public MessageIdStreamOffset(MessageId messageId){
+    _messageId = messageId;
+  }
+
+  public MessageIdStreamOffset(String messageId){
+    try {
+      _messageId = MessageId.fromByteArray(messageId.getBytes(StandardCharsets.UTF_8));
+    }catch (IOException e){

Review comment:
       formatting

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml
##########
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"

Review comment:
       missing license header, also newline at end of file (there's an intelliJ setting which does this automatically btw)

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pulsar.client.api.MessageId;
+
+public class PulsarConfig {
+  public static final String STREAM_TYPE = "pulsar";
+  public static final String PULSAR_PROP_PREFIX = "consumer.prop";
+  public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+  public static final String START_POSITION = "start_position";
+  public static final String DEFAULT_BOOTSTRAP_BROKERS = "pulsar://localhost:6650";
+
+  private String _pulsarTopicName;
+  private String _subscriberId;
+  private String _bootstrapServers;
+  private MessageId _initialMessageId = MessageId.latest;
+  private Map<String, String> _pulsarConsumerProperties;
+
+  public PulsarConfig(StreamConfig streamConfig, String subscriberId){
+    Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
+    _pulsarTopicName = streamConfig.getTopicName();
+    _bootstrapServers = streamConfigMap.getOrDefault(BOOTSTRAP_SERVERS, DEFAULT_BOOTSTRAP_BROKERS);
+
+    Preconditions.checkNotNull(_bootstrapServers,

Review comment:
       this will never be null because of the default. We shouldn't be setting a default rt? failing if this is not set is better

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pulsar.client.api.MessageId;
+
+public class PulsarConfig {
+  public static final String STREAM_TYPE = "pulsar";
+  public static final String PULSAR_PROP_PREFIX = "consumer.prop";
+  public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+  public static final String START_POSITION = "start_position";
+  public static final String DEFAULT_BOOTSTRAP_BROKERS = "pulsar://localhost:6650";
+
+  private String _pulsarTopicName;
+  private String _subscriberId;
+  private String _bootstrapServers;
+  private MessageId _initialMessageId = MessageId.latest;
+  private Map<String, String> _pulsarConsumerProperties;
+
+  public PulsarConfig(StreamConfig streamConfig, String subscriberId){
+    Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
+    _pulsarTopicName = streamConfig.getTopicName();
+    _bootstrapServers = streamConfigMap.getOrDefault(BOOTSTRAP_SERVERS, DEFAULT_BOOTSTRAP_BROKERS);
+
+    Preconditions.checkNotNull(_bootstrapServers,
+        "Must specify bootstrap broker connect string " + BOOTSTRAP_SERVERS + " in high level pulsar consumer");

Review comment:
       is this config only for HLL pulsar consumer? If yes, please rename to PulsarStreamLevelConfig. If not, change this message

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pulsar.client.api.MessageId;
+
+public class PulsarConfig {
+  public static final String STREAM_TYPE = "pulsar";
+  public static final String PULSAR_PROP_PREFIX = "consumer.prop";
+  public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+  public static final String START_POSITION = "start_position";
+  public static final String DEFAULT_BOOTSTRAP_BROKERS = "pulsar://localhost:6650";
+
+  private String _pulsarTopicName;
+  private String _subscriberId;
+  private String _bootstrapServers;
+  private MessageId _initialMessageId = MessageId.latest;
+  private Map<String, String> _pulsarConsumerProperties;
+
+  public PulsarConfig(StreamConfig streamConfig, String subscriberId){
+    Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
+    _pulsarTopicName = streamConfig.getTopicName();
+    _bootstrapServers = streamConfigMap.getOrDefault(BOOTSTRAP_SERVERS, DEFAULT_BOOTSTRAP_BROKERS);
+
+    Preconditions.checkNotNull(_bootstrapServers,
+        "Must specify bootstrap broker connect string " + BOOTSTRAP_SERVERS + " in high level pulsar consumer");
+    _subscriberId = subscriberId;
+
+    String startPositionProperty = StreamConfigProperties.constructStreamProperty(STREAM_TYPE, START_POSITION);
+    String startPosition = streamConfigMap.getOrDefault(startPositionProperty, "latest");
+    if(startPosition.equals("earliest")){

Review comment:
       formatting off

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pulsar.client.api.MessageId;
+
+public class PulsarConfig {
+  public static final String STREAM_TYPE = "pulsar";
+  public static final String PULSAR_PROP_PREFIX = "consumer.prop";
+  public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+  public static final String START_POSITION = "start_position";
+  public static final String DEFAULT_BOOTSTRAP_BROKERS = "pulsar://localhost:6650";
+
+  private String _pulsarTopicName;
+  private String _subscriberId;
+  private String _bootstrapServers;
+  private MessageId _initialMessageId = MessageId.latest;
+  private Map<String, String> _pulsarConsumerProperties;
+
+  public PulsarConfig(StreamConfig streamConfig, String subscriberId){
+    Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
+    _pulsarTopicName = streamConfig.getTopicName();
+    _bootstrapServers = streamConfigMap.getOrDefault(BOOTSTRAP_SERVERS, DEFAULT_BOOTSTRAP_BROKERS);
+
+    Preconditions.checkNotNull(_bootstrapServers,
+        "Must specify bootstrap broker connect string " + BOOTSTRAP_SERVERS + " in high level pulsar consumer");
+    _subscriberId = subscriberId;
+
+    String startPositionProperty = StreamConfigProperties.constructStreamProperty(STREAM_TYPE, START_POSITION);
+    String startPosition = streamConfigMap.getOrDefault(startPositionProperty, "latest");

Review comment:
       add constants for these latest/earliest

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pulsar.client.api.MessageId;
+
+public class PulsarConfig {
+  public static final String STREAM_TYPE = "pulsar";
+  public static final String PULSAR_PROP_PREFIX = "consumer.prop";

Review comment:
       rename to PULSAR_CONSUMER_PROP_PREFIX

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pulsar.client.api.MessageId;
+
+public class PulsarConfig {
+  public static final String STREAM_TYPE = "pulsar";
+  public static final String PULSAR_PROP_PREFIX = "consumer.prop";
+  public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+  public static final String START_POSITION = "start_position";

Review comment:
       can we keep the convention "start.position" ?

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnectionHandler implements PartitionLevelConsumer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
+  private final ExecutorService _executorService;
+
+  public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
+    super(clientId, streamConfig, partition);
+    _executorService = Executors.newSingleThreadExecutor();
+  }
+
+  @Override
+  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
+      int timeoutMillis) {
+    final MessageId startMessageId = ((MessageIdStreamOffset) startMsgOffset).getMessageId();
+    final MessageId endMessageId =
+        endMsgOffset == null ? MessageId.latest : ((MessageIdStreamOffset) endMsgOffset).getMessageId();
+
+    List<Message<byte[]>> messagesList = new ArrayList<>();
+    Future<PulsarMessageBatch> pulsarResultFuture =
+        _executorService.submit(() -> fetchMessages(startMessageId, endMessageId, messagesList));
+
+    try {
+      return pulsarResultFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      //The fetchMessages has thrown an exception. Most common cause is the timeout.
+      //We return the records fetched till now along with the next start offset.
+      LOGGER.warn("Error while fetching records from Pulsar", e);
+      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId));
+    }
+  }
+
+  public PulsarMessageBatch fetchMessages(MessageId startMessageId, MessageId endMessageId,
+      List<Message<byte[]>> messagesList) {
+    try {
+      _reader.seek(startMessageId);
+
+      while (_reader.hasMessageAvailable()) {
+        Message<byte[]> nextMessage = _reader.readNext();
+
+        if (endMessageId != null) {

Review comment:
       endMessageId can never be null here rt?

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnectionHandler implements PartitionLevelConsumer {

Review comment:
       we should be implementing PartitionGroupConsumer, not PartitionLevelConsumer

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PulsarStreamMetadataProvider extends PulsarPartitionLevelConnectionHandler implements StreamMetadataProvider {
+  private Logger LOGGER = LoggerFactory.getLogger(PulsarStreamMetadataProvider.class);
+
+  private String _clientId;
+  private StreamConfig _streamConfig;
+  private int _partition;
+
+  public PulsarStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
+    super(clientId, streamConfig, 0);

Review comment:
       `0` can be a real partition number right? should this be defaulted to something else that is distinguishable? (<0)

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PulsarConsumerTest {
+
+  public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME";
+  public static final String TEST_TOPIC = "test-topic";
+  public static final int NUM_PARTITION = 3;
+  public static final String MESSAGE_PREFIX = "sample_msg";
+  public static final int NUM_RECORDS_PER_PARTITION = 1000;
+  public static final String CLIENT_ID = "clientId";
+  private PulsarClient _pulsarClient;
+  private PulsarStandaloneCluster _pulsarStandaloneCluster;
+  private HashMap<Integer, MessageId> _partitionToFirstMessageIdMap = new HashMap<>();
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    _pulsarStandaloneCluster = new PulsarStandaloneCluster();
+
+    _pulsarStandaloneCluster.start();
+
+    PulsarAdmin admin =
+        PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + _pulsarStandaloneCluster.getAdminPort()).build();
+
+    String bootstrapServer = "pulsar://localhost:" + _pulsarStandaloneCluster.getBrokerPort();
+
+    _pulsarClient = PulsarClient.builder().serviceUrl(bootstrapServer).build();
+
+    admin.topics().createPartitionedTopic(TEST_TOPIC, NUM_PARTITION);
+
+    publishRecords();
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    _pulsarStandaloneCluster.stop();
+  }
+
+  public void publishRecords()
+      throws Exception {
+    for(int p = 0; p < NUM_PARTITION ; p++) {
+      final int partition = p;
+      Producer<String> producer = _pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC).messageRouter(new MessageRouter() {
+        @Override
+        public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+          return partition;
+        }
+      }).create();
+
+      for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
+        MessageId messageId = producer.send(MESSAGE_PREFIX + "_" + i);
+        if(!_partitionToFirstMessageIdMap.containsKey(partition)){
+          _partitionToFirstMessageIdMap.put(partition, messageId);
+        }
+      }
+
+    }
+
+    List<String> partitionedTopicList =  _pulsarClient.getPartitionsForTopic(TEST_TOPIC).get();
+
+    for(String p : partitionedTopicList) {

Review comment:
       remove the sout

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
##########
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.Set;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamDecoderProvider;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Reader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PulsarStreamLevelConsumer implements StreamLevelConsumer {
+  private Logger LOGGER;
+
+  private StreamMessageDecoder _messageDecoder;
+
+  private StreamConfig _streamConfig;
+  private PulsarConfig _pulsarStreamLevelStreamConfig;
+
+  private Reader<byte[]> _reader;
+
+  private long lastLogTime = 0;
+  private long lastCount = 0;
+  private long currentCount = 0L;
+
+  public PulsarStreamLevelConsumer(String clientId, String tableName, StreamConfig streamConfig,
+      Set<String> sourceFields, String subscriberId) {
+    _streamConfig = streamConfig;
+    _pulsarStreamLevelStreamConfig = new PulsarConfig(streamConfig, subscriberId);
+
+    _messageDecoder = StreamDecoderProvider.create(streamConfig, sourceFields);
+
+    LOGGER =
+        LoggerFactory.getLogger(PulsarConfig.class.getName() + "_" + tableName + "_" + streamConfig.getTopicName());
+    LOGGER.info("PulsarStreamLevelConsumer: streamConfig : {}", _streamConfig);
+  }
+
+  @Override
+  public void start()
+      throws Exception {
+    _reader = PulsarStreamLevelConsumerManager.acquirePulsarConsumerForConfig(_pulsarStreamLevelStreamConfig);
+  }
+
+  @Override
+  public GenericRow next(GenericRow destination) {
+    try {
+      if (_reader.hasMessageAvailable()) {
+        final Message<byte[]> record = _reader.readNext();
+        destination = _messageDecoder.decode(record.getData(), destination);
+
+        ++currentCount;
+
+        final long now = System.currentTimeMillis();
+        // Log every minute or 100k events
+        if (now - lastLogTime > 60000 || currentCount - lastCount >= 100000) {
+          if (lastCount == 0) {
+            LOGGER.info("Consumed {} events from kafka stream {}", currentCount, _streamConfig.getTopicName());

Review comment:
       s/kafka/pulsar

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PulsarStreamMetadataProvider extends PulsarPartitionLevelConnectionHandler implements StreamMetadataProvider {
+  private Logger LOGGER = LoggerFactory.getLogger(PulsarStreamMetadataProvider.class);
+
+  private String _clientId;
+  private StreamConfig _streamConfig;
+  private int _partition;
+
+  public PulsarStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
+    super(clientId, streamConfig, 0);
+    _clientId = clientId;
+    _streamConfig = streamConfig;
+  }
+
+  public PulsarStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
+    super(clientId, streamConfig, partition);
+    _clientId = clientId;
+    _streamConfig = streamConfig;
+    _partition = partition;
+  }
+
+  @Override
+  public int fetchPartitionCount(long timeoutMillis) {
+    try {
+      return _pulsarClient.getPartitionsForTopic(_streamConfig.getTopicName()).get().size();
+    } catch (Exception e) {
+      //TODO: Handle error
+      return 0;
+    }
+  }
+
+  public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) {
+    throw new UnsupportedOperationException("The use of this method is not supported");
+  }
+
+  @Override
+  public StreamPartitionMsgOffset fetchStreamPartitionOffset(@Nonnull OffsetCriteria offsetCriteria,

Review comment:
       even this should not be implemented. Only `computePartitionGroupMetadata`

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnectionHandler implements PartitionLevelConsumer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
+  private final ExecutorService _executorService;
+
+  public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
+    super(clientId, streamConfig, partition);
+    _executorService = Executors.newSingleThreadExecutor();
+  }
+
+  @Override
+  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
+      int timeoutMillis) {
+    final MessageId startMessageId = ((MessageIdStreamOffset) startMsgOffset).getMessageId();
+    final MessageId endMessageId =
+        endMsgOffset == null ? MessageId.latest : ((MessageIdStreamOffset) endMsgOffset).getMessageId();
+
+    List<Message<byte[]>> messagesList = new ArrayList<>();
+    Future<PulsarMessageBatch> pulsarResultFuture =
+        _executorService.submit(() -> fetchMessages(startMessageId, endMessageId, messagesList));
+
+    try {
+      return pulsarResultFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId));

Review comment:
       if yes, then better to split exception as Timeout and everything else, so that we don't log in timeout case

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarStandaloneCluster.java
##########
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.io.File;
+import org.apache.commons.io.FileUtils;
+
+
+public class PulsarStandaloneCluster {
+  public static final Integer DEFAULT_BROKER_PORT = 6650;
+  public static final Integer DEFAULT_ADMIN_PORT = 8080;
+  public static final String DEFAULT_DATA_MOUNT_DIRECTORY = "pulsar-data";
+  public static final String DEFAULT_CONF_MOUNT_DIRECTORY = "pulsar-conf";
+  public static final String DOCKER_CONTAINER_NAME = "pulsar_standalone_pinot";
+
+  private Integer _brokerPort;
+  private Integer _adminPort;
+  private String _dataMountDirectory;
+  private String _confMountDirectory;
+
+  private Process _pulsarCluster;
+
+  public static final String DOCKER_COMMAND =
+      "docker run --name " + DOCKER_CONTAINER_NAME + " -p %d:%d -p %d:%d " + "  --mount source=pulsardata,target=%s"
+          + "  --mount source=pulsarconf,target=%s " + "  apachepulsar/pulsar:2.7.2 bin/pulsar standalone";
+
+  public static final String DOCKER_STOP_COMMAND = "docker stop " + DOCKER_CONTAINER_NAME;
+  public static final String DOCKER_REMOVE_COMMAND = "docker rm " + DOCKER_CONTAINER_NAME;
+
+  public PulsarStandaloneCluster() {
+
+  }
+
+  public void setBrokerPort(Integer brokerPort) {
+    _brokerPort = brokerPort;
+  }
+
+  public void setAdminPort(Integer adminPort) {
+    _adminPort = adminPort;
+  }
+
+  public Integer getBrokerPort() {
+    Integer brokerPort = _brokerPort == null ? DEFAULT_BROKER_PORT : _brokerPort;
+    return brokerPort;
+  }
+
+  public Integer getAdminPort() {
+    Integer adminPort = _adminPort == null ? DEFAULT_ADMIN_PORT : _adminPort;
+    return adminPort;
+  }
+
+  public void setDataMountDirectory(String dataMountDirectory) {
+    _dataMountDirectory = dataMountDirectory;
+  }
+
+  public void setConfMountDirectory(String confMountDirectory) {
+    _confMountDirectory = confMountDirectory;
+  }
+
+  public void start()
+      throws Exception {
+    Integer brokerPort = _brokerPort == null ? DEFAULT_BROKER_PORT : _brokerPort;
+    Integer adminPort = _adminPort == null ? DEFAULT_ADMIN_PORT : _adminPort;
+    String dataMountDirectory = _dataMountDirectory == null ? DEFAULT_DATA_MOUNT_DIRECTORY : _dataMountDirectory;
+    String confMountDirectory = _confMountDirectory == null ? DEFAULT_CONF_MOUNT_DIRECTORY : _confMountDirectory;
+
+    File tempDir = FileUtils.getTempDirectory();
+    File dataDir = new File(tempDir, dataMountDirectory);
+    File confDir = new File(tempDir, confMountDirectory);
+    dataDir.mkdirs();
+    confDir.mkdirs();
+
+    String formattedCommand = String
+        .format(DOCKER_COMMAND, brokerPort, brokerPort, adminPort, adminPort, dataDir.getAbsolutePath(),
+            confDir.getAbsolutePath());
+
+    _pulsarCluster = Runtime.getRuntime().exec(formattedCommand);
+
+    Thread.sleep(30000);
+  }
+
+  public void stop()
+      throws Exception {

Review comment:
       also delete the temp files?

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarStandaloneCluster.java
##########
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.io.File;
+import org.apache.commons.io.FileUtils;
+
+
+public class PulsarStandaloneCluster {
+  public static final Integer DEFAULT_BROKER_PORT = 6650;
+  public static final Integer DEFAULT_ADMIN_PORT = 8080;
+  public static final String DEFAULT_DATA_MOUNT_DIRECTORY = "pulsar-data";
+  public static final String DEFAULT_CONF_MOUNT_DIRECTORY = "pulsar-conf";
+  public static final String DOCKER_CONTAINER_NAME = "pulsar_standalone_pinot";
+
+  private Integer _brokerPort;
+  private Integer _adminPort;
+  private String _dataMountDirectory;
+  private String _confMountDirectory;
+
+  private Process _pulsarCluster;
+
+  public static final String DOCKER_COMMAND =

Review comment:
       is there a way to do this without docker? something similar to how we bring up test Kafka cluster or test ZK?

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnectionHandler implements PartitionLevelConsumer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
+  private final ExecutorService _executorService;
+
+  public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
+    super(clientId, streamConfig, partition);
+    _executorService = Executors.newSingleThreadExecutor();
+  }
+
+  @Override
+  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
+      int timeoutMillis) {
+    final MessageId startMessageId = ((MessageIdStreamOffset) startMsgOffset).getMessageId();
+    final MessageId endMessageId =
+        endMsgOffset == null ? MessageId.latest : ((MessageIdStreamOffset) endMsgOffset).getMessageId();
+
+    List<Message<byte[]>> messagesList = new ArrayList<>();
+    Future<PulsarMessageBatch> pulsarResultFuture =
+        _executorService.submit(() -> fetchMessages(startMessageId, endMessageId, messagesList));
+
+    try {
+      return pulsarResultFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId));

Review comment:
       will we end up printing this log continuously, if the stream is idle?

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarStandaloneCluster.java
##########
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.io.File;
+import org.apache.commons.io.FileUtils;
+
+
+public class PulsarStandaloneCluster {
+  public static final Integer DEFAULT_BROKER_PORT = 6650;
+  public static final Integer DEFAULT_ADMIN_PORT = 8080;
+  public static final String DEFAULT_DATA_MOUNT_DIRECTORY = "pulsar-data";
+  public static final String DEFAULT_CONF_MOUNT_DIRECTORY = "pulsar-conf";
+  public static final String DOCKER_CONTAINER_NAME = "pulsar_standalone_pinot";
+
+  private Integer _brokerPort;
+  private Integer _adminPort;
+  private String _dataMountDirectory;
+  private String _confMountDirectory;
+
+  private Process _pulsarCluster;
+
+  public static final String DOCKER_COMMAND =
+      "docker run --name " + DOCKER_CONTAINER_NAME + " -p %d:%d -p %d:%d " + "  --mount source=pulsardata,target=%s"
+          + "  --mount source=pulsarconf,target=%s " + "  apachepulsar/pulsar:2.7.2 bin/pulsar standalone";
+
+  public static final String DOCKER_STOP_COMMAND = "docker stop " + DOCKER_CONTAINER_NAME;
+  public static final String DOCKER_REMOVE_COMMAND = "docker rm " + DOCKER_CONTAINER_NAME;
+
+  public PulsarStandaloneCluster() {
+
+  }
+
+  public void setBrokerPort(Integer brokerPort) {
+    _brokerPort = brokerPort;
+  }
+
+  public void setAdminPort(Integer adminPort) {
+    _adminPort = adminPort;
+  }
+
+  public Integer getBrokerPort() {
+    Integer brokerPort = _brokerPort == null ? DEFAULT_BROKER_PORT : _brokerPort;
+    return brokerPort;
+  }
+
+  public Integer getAdminPort() {
+    Integer adminPort = _adminPort == null ? DEFAULT_ADMIN_PORT : _adminPort;
+    return adminPort;
+  }
+
+  public void setDataMountDirectory(String dataMountDirectory) {
+    _dataMountDirectory = dataMountDirectory;
+  }
+
+  public void setConfMountDirectory(String confMountDirectory) {
+    _confMountDirectory = confMountDirectory;
+  }
+
+  public void start()
+      throws Exception {
+    Integer brokerPort = _brokerPort == null ? DEFAULT_BROKER_PORT : _brokerPort;
+    Integer adminPort = _adminPort == null ? DEFAULT_ADMIN_PORT : _adminPort;
+    String dataMountDirectory = _dataMountDirectory == null ? DEFAULT_DATA_MOUNT_DIRECTORY : _dataMountDirectory;
+    String confMountDirectory = _confMountDirectory == null ? DEFAULT_CONF_MOUNT_DIRECTORY : _confMountDirectory;
+
+    File tempDir = FileUtils.getTempDirectory();
+    File dataDir = new File(tempDir, dataMountDirectory);
+    File confDir = new File(tempDir, confMountDirectory);
+    dataDir.mkdirs();
+    confDir.mkdirs();
+
+    String formattedCommand = String
+        .format(DOCKER_COMMAND, brokerPort, brokerPort, adminPort, adminPort, dataDir.getAbsolutePath(),
+            confDir.getAbsolutePath());
+
+    _pulsarCluster = Runtime.getRuntime().exec(formattedCommand);
+
+    Thread.sleep(30000);

Review comment:
       can this be TestCondition wait?

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.Set;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+
+
+public class PulsarConsumerFactory extends StreamConsumerFactory {
+  @Override
+  public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
+    return new PulsarPartitionLevelConsumer(clientId, _streamConfig, partition);
+  }
+
+  @Override
+  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Set<String> fieldsToRead,
+      String groupId) {
+    return new PulsarStreamLevelConsumer(clientId, tableName, _streamConfig, fieldsToRead, groupId);
+  }
+
+  @Override
+  public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
+    return new PulsarStreamMetadataProvider(clientId, _streamConfig, partition);
+  }
+
+  @Override
+  public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+    return new PulsarStreamMetadataProvider(clientId, _streamConfig);
+  }
+
+  @Override
+  public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() {
+    return new MessageIdStreamOffsetFactory();
+  }
+
+  @Override
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+      PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
+    throw new UnsupportedOperationException();

Review comment:
       line 34 and 61 need to be swapped (checkKinesis impl for reference)

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnectionHandler implements PartitionLevelConsumer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
+  private final ExecutorService _executorService;
+
+  public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
+    super(clientId, streamConfig, partition);
+    _executorService = Executors.newSingleThreadExecutor();
+  }
+
+  @Override
+  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,

Review comment:
       mark `endMsgOffset` as nullable

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.Set;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+
+
+public class PulsarConsumerFactory extends StreamConsumerFactory {
+  @Override
+  public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
+    return new PulsarPartitionLevelConsumer(clientId, _streamConfig, partition);
+  }
+
+  @Override
+  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Set<String> fieldsToRead,
+      String groupId) {
+    return new PulsarStreamLevelConsumer(clientId, tableName, _streamConfig, fieldsToRead, groupId);
+  }
+
+  @Override
+  public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
+    return new PulsarStreamMetadataProvider(clientId, _streamConfig, partition);

Review comment:
       even this. Shouldn't be used anymore

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PulsarStreamMetadataProvider extends PulsarPartitionLevelConnectionHandler implements StreamMetadataProvider {
+  private Logger LOGGER = LoggerFactory.getLogger(PulsarStreamMetadataProvider.class);
+
+  private String _clientId;
+  private StreamConfig _streamConfig;
+  private int _partition;
+
+  public PulsarStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
+    super(clientId, streamConfig, 0);
+    _clientId = clientId;
+    _streamConfig = streamConfig;
+  }
+
+  public PulsarStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
+    super(clientId, streamConfig, partition);
+    _clientId = clientId;
+    _streamConfig = streamConfig;
+    _partition = partition;
+  }
+
+  @Override
+  public int fetchPartitionCount(long timeoutMillis) {

Review comment:
       we should only be implementing `computePartitionGroupMetadata`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org