You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/08/16 09:11:03 UTC

Slack digest for #general - 2020-08-16

2020-08-15 10:38:03 UTC - Christophe Bornet: For the API, I'm thinking of a way to register something like a gRPC CallStreamObserver ( <https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/CallStreamObserver.html|https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/CallStreamObserver.html>) or a reactive streams Subscription (<https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Subscription.html#request-long-|https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Subscription.html#request-long->) which have a request(n) method to signal how much items the producer can push. At startup the request would be called with the size of the queue. And then request(1) is called each time an item is removed from the queue.
----
2020-08-15 12:29:58 UTC - Fernando: That article confirms my understanding that  messages shouldn’t be removed from the subscription’s backlog if unacknowledged. The problem is that if I look at the topic stats, the subscription is just gone :s
+1 : Julius S
----
2020-08-15 12:39:09 UTC - Fernando: It looks similar to this bug <https://github.com/apache/pulsar/issues/5579|https://github.com/apache/pulsar/issues/5579> however my namespace doesn’t have a TTL. Instead it has infinite retention, which makes it weirder
----
2020-08-15 15:43:52 UTC - Joshua Decosta: ```    public static ByteBuf newConnect(String authMethodName, AuthData authData, int protocolVersion, String libVersion,
                                     String targetBroker, String originalPrincipal, AuthData originalAuthData,
                                     String originalAuthMethod) {
        CommandConnect.Builder connectBuilder = CommandConnect.newBuilder();
        connectBuilder.setClientVersion(libVersion != null ? libVersion : "Pulsar Client");
        connectBuilder.setAuthMethodName(authMethodName);

        if (targetBroker != null) {
            // When connecting through a proxy, we need to specify which broker do we want to be proxied through
            connectBuilder.setProxyToBrokerUrl(targetBroker);
        }

        if (authData != null) {
            connectBuilder.setAuthData(ByteString.copyFrom(authData.getBytes()));
        }

        if (originalPrincipal != null) {
            connectBuilder.setOriginalPrincipal(originalPrincipal);
        }

        if (originalAuthData != null) {
            connectBuilder.setOriginalAuthData(new String(originalAuthData.getBytes(), UTF_8));
        }

        if (originalAuthMethod != null) {
            connectBuilder.setOriginalAuthMethod(originalAuthMethod);
        }
        connectBuilder.setProtocolVersion(protocolVersion);
        connectBuilder.setFeatureFlags(getFeatureFlags());
        CommandConnect connect = connectBuilder.build();
        ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.CONNECT).setConnect(connect));
        connect.recycle();
        connectBuilder.recycle();
        return res;
    }```
After debugging for a bit I keep hitting this method in `Commands.java`  and I see it's setting the original prinicple with the originalprinciple auth data. Why is this happening? Am I supposed to be looking for the original principle auth data my `AuthorizationProvider` methods? I still can't figure out why the data isn't being passed.
----
2020-08-15 15:56:59 UTC - Joshua Decosta: I'm not sure where I should be looking at this point.
----
2020-08-15 16:03:34 UTC - Joshua Decosta: This method in `ProxyConnection` gets hit often:

```    private void completeConnect() {
        <http://LOG.info|LOG.info>("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
            remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
        if (hasProxyToBrokerUrl) {
            // Client already knows which broker to connect. Let's open a
            // connection there and just pass bytes in both directions
            state = State.ProxyConnectionToBroker;
            directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl,
                protocolVersionToAdvertise, sslHandlerSupplier);
            cancelKeepAliveTask();
        } else {
            // Client is doing a lookup, we can consider the handshake complete
            // and we'll take care of just topics and
            // partitions metadata lookups
            state = State.ProxyLookupRequests;
            lookupProxyHandler = new LookupProxyHandler(service, this);
            ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
        }
    }```
That else statement gets hit almost everytime.
----