You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Jia Zhai <zh...@apache.org> on 2019/08/28 02:20:04 UTC

PIP 41: Pluggable Protocol Handler

Hi all,

We are starting a proposal about pluggable protocol handler in Apache
Pulsar.
The proposal is in:
https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler
Looking forward to any feedback.



================
Motivation

As a cloud-native messaging system, Pulsar is designed in a modern layered
architecture. It provides a lot of elegant components such as load manager,
namespace bundle distribution, topic lookup and the streaming storage
abstraction. These components can be reused for supporting other messaging
protocols such as Kafka, AMQP and MQTT protocols. In PIP-42: KoP - Kafka on
Pulsar
<https://github.com/apache/pulsar/wiki/PIP-42%3A-KoP---Kafka-on-Pulsar> we
will be implementing Kafka protocol natively in Pulsar broker. This PIP
proposes introducing a pluggable protocol handler mechanism in Pulsar
broker. So Pulsar broker can dynamically load additional protocol handlers
on runtime and support other message protocols. This also allows developers
to extend Pulsar capabilities to other messaging domains by leveraging all
the benefits provided by Pulsar architecture.
<https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler#scope>
Scope

It is almost impossible to define a perfect protocol handler interface for
the first attempt. So this PIP mainly focuses on defining the lifecycle of
protocol handlers. Hence it will provide direct access to all components in
Pulsar BrokerService. We will NOT attempt to define a clear boundary what
components that protocol handler can access and what components it can not.
We will defer defining the boundary until we have used this interface to
implement at least 2 messaging protocols, such as Kafka and MQTT.
<https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler#protocol-handler>Protocol
Handler

The main interface of ProtocolHandler is defined as below:

/**
* The protocol handler interface for support additional protocols on
Pulsar brokers.
*/
@Beta
public interface ProtocolHandler extends AutoCloseable {

   /**
    * Returns the unique protocol name. For example, `kafka-v2` for
protocol handler for Kafka v2 protocol.
    */
   String protocolName();

   /**
    * Verify if the protocol can speak the given <tt>protocol</tt>.
    *
    * @param protocol the protocol to verify
    * @return true if the protocol handler can handle the given
protocol, otherwise false.
    */
   boolean accept(String protocol);

   /**
    * Initialize the protocol handler when the protocol is constructed
from reflection.
    *
    * @param conf broker service configuration
    * @throws Exception when fail to initialize the protocol handler.
    */
   void initialize(ServiceConfiguration conf) throws Exception;

   /**
    * Retrieve the protocol related data to advertise as part of
    * {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
    *
    * <p>For example, when implementing a Kafka protocol handler, you
need to advertise
    * corresponding Kafka listeners so that Pulsar brokers understand
how to give back
    * the listener information when handling metadata requests.
    *
    * @return the protocol related data to be advertised as part of
LocalBrokerData.
    */
   String getProtocolDataToAdvertise();

   /**
    * Start the protocol handler with the provided broker service.
    *
    * <p>The broker service provides the accesses to the Pulsar
components such as load
    * manager, namespace service, managed ledger and etc.
    *
    * @param service the broker service to start with.
    */
   void start(BrokerService service);

   /**
    * Create the list of channel initializers for the ports that this
protocol handler
    * will listen on.
    *
    * @return the list of channel initializers for the ports that this
protocol handler listens on.
    */
   Map<InetSocketAddress, ChannelInitializer<SocketChannel>>
newChannelInitializers();

   @Override
   void close();
}


A protocol handler will enter following lifecycle phases one-by-one:

*INITIALIZE* *ADVERTISE* *START* *STOP*
<https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler#initialize>
INITIALIZE

A protocol handler will be constructed using ServiceLoader. So an
implementation of protocol handler should provide a default no-args
constructor. After a protocol handler is constructed, it will be tested to
see if it can accept a given protocol via #accept(String) method.

If a protocol handler can accept the given protocol, the protocol handler
will be picked as a handler to be loaded by the Pulsar broker. Pulsar
broker will then initialize the handler using
#initialize(ServiceConfiguration) method.

The implementation of #initialize should initialize the handler with the
provided configuration.
<https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler#advertise>
ADVERTISE

In a lot of messaging protocols, a *broker* needs to advertise its
listening ports and protocols to a service discovery service so that the
messaging clients know how to connect to it correctly.

So the implementation of a protocol handler needs to implement
getProtocolDataToAdvertiseto return its protocol specific data to advertise
to Pulsar’s topic discovery system. So Pulsar brokers and the clients can
use this protocol-specific data to learn how to connect to the brokers. We
will explain how do we store the protocol-specific data in Pulsar’s topic
discovery system.

In KoP’s implementation, we advertise Kafka’s listeners as kafka protocol
data. So Kafka protocol handler can return the correct Kafka broker
listener information when handling topic metadata request.
<https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler#start>
START

In the *START* phase, Pulsar broker starts a protocol handler via
#start(BrokerService). The implementation of a protocol handler can setup
the resources required for the protocol handler and it get access to Pulsar
components via the pass-in BrokerService instance.

Once the protocol handler is started, Pulsar broker calls
newChannelInitializers() to return the list of channel initializers that
this protocol implemented. The returned value is a map from listening
address to the actual channel initializer. Because a messaging protocol
might be listening on multiple different ports for different security
protocols.
<https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler#stop>
STOP

Pulsar broker calls #close() to shutdown a protocol handler. The
implementation should clean up the resources used by itself.
<https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler#configuration>
Configuration

messagingProtocols is a newly introduced setting in Pulsar broker for
configuring the list of protocols that Pulsar broker will load in addition
to Pulsar protocol handler.

The ProtocolHandler will be loaded by using ServiceLoader. So Pulsar broker
will get a list of candidate protocol handler when it starts up. Pulsar
broker will test all the candidate protocol handlers to see if they support
one of the messaging protocols configured in messagingProtocols. It only
initializes the protocol handlers for the configured protocols.

For example, if ServiceLoader finds a MQTT protocol handler and a Kafka
protocol handler. But the broker is configured to use Kafka handler. Pulsar
broker will only initialize the Kafkahandler.
<https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler#protocol-data>Protocol
Data

We introduced a new field protocols in LocalBrokerData for storing
protocol-specific data for different registered protocols.

private Map<String, String> protocols;

The key of the map is the protocol name. The value of the map is an opaque
string for storing the protocol-specific data. Pulsar doesn’t need to know
how to interpret the value. Only the protocol implementation knows how to
interpret the value.
<https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler#compatibility-deprecation-and-migration-plan>Compatibility,
Deprecation, and Migration Plan

This change proposes introducing a new protocol handler mechanism in
Pulsar. It doesn’t touch any existing wire protocol or storage formats. So
there is no compatibility, deprecation or migration plan for existing
Pulsar applications.

Existing tests cover code changes related to refactor.
<https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler#test-plan>Test
Plan

Existing tests cover code changes related to refactor. New tests will be
added for the pluggable protocol handler.
<https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler#reject-alternatives>Reject
Alternatives
<https://github.com/apache/pulsar/wiki/PIP-41:-Pluggable-Protocol-Handler#messaging-gateway>Messaging
Gateway

Issue #2556 <https://github.com/apache/pulsar/issues/2556> attempts to
introduce a messaging gateway / proxy to support different messaging
protocols. The initiative is similar to the proposal here. However the
proposal is proposing adding the support in the proxy layer. There are a
few drawbacks of doing this in gateway/proxy layer. Firstly, it doesn’t
leverage the advantages that the streaming storage layer already provides
(i.e fencing, cursor management) and make things pretty difficult on
supporting protocols like Kafka protocol.; Secondly, it introduces
additional overhead (i.e network bandwidth) which is not suitable for
high-volume streaming workloads.; Lastly, it also introduces the complexity
of managing the system.