You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/04/03 09:11:03 UTC

Slack digest for #general - 2020-04-03

2020-04-02 09:32:03 UTC - Alessandro R.: @Alessandro R. has joined the channel
----
2020-04-02 09:45:07 UTC - Nicolò Paganin: Thanks @John Duffie we had the same problem, you save us some debugging times :slightly_smiling_face:
----
2020-04-02 12:06:57 UTC - bry: @bry has joined the channel
----
2020-04-02 12:36:08 UTC - Dan Kitchen (TCEU): @Dan Kitchen (TCEU) has joined the channel
----
2020-04-02 15:49:01 UTC - Simba Khadder: I wrote up an article about why and how we moved from kafka to pulsar!
Here's the reddit link: <https://www.reddit.com/r/programming/comments/ftm68i/why_and_how_we_moved_from_apache_kafka_to_apache/>
(let me know if this is the wrong place to put this)
+1 : David Kjerrumgaard, Atif, Matteo Merli, Konstantinos Papalias, Sijie Guo, Matteo Minardi
----
2020-04-02 16:53:33 UTC - Santiago Del Campo: Hello!

Having the broker websocket enabled: Is there any way to only allow message consumption through websocket and disallow message production through websocket?
----
2020-04-02 16:54:34 UTC - Matteo Merli: You can do that with authentication
----
2020-04-02 16:55:20 UTC - Matteo Merli: if there's no auth enabled, you could place a nginx / envoy proxy and filter there
----
2020-04-02 16:57:35 UTC - Santiago Del Campo: No.. auth it's not enabled. Interesting idea regarding a nginx.... what configuration would be needed?
----
2020-04-02 16:59:25 UTC - Matteo Merli: if you put it in reverse proxy, just proxy the URL for the /..../consume/....
----
2020-04-02 17:03:14 UTC - Santiago Del Campo: Mmmm right right.
----
2020-04-02 17:03:57 UTC - Santiago Del Campo: gonna try that one out. thank!
----
2020-04-02 17:35:27 UTC - Rattanjot Singh: Hi! I am sending a byte [] to a pulsar function. But when I receive it is converted to a string. Any way that I can still receive it as a byte [] Here is my pulsar function
```
package pulsarfunctions.starter.sdk;

import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
import pulsarfunctions.starter.utils.ObjectSerializer;

import java.util.*;
import java.util.stream.Collectors;

public class VirtualTopicMultiple implements Function&lt;String, Void&gt; {

    private static final String TOPIC_1 = "abc7";
    private static final String TOPIC_2 = "abc8";
    private static final String TOPIC_3 = "abc9";
    private static final String TOPIC_4 = "abc10";

    @Override
    public Void process(String input, Context context) {
        Logger LOG = context.getLogger();
//        <http://LOG.info|LOG.info>(input);
      
        Optional&lt;String&gt; topicName = context.getCurrentRecord().getTopicName();
        Optional&lt;String&gt; value1 = Optional.of("<persistent://public/default/Test1>");
        Optional&lt;String&gt; value2 = Optional.of("<persistent://public/default/Test2>");


        } else if (topicName.equals(value2)) {

            context.publish(TOPIC_3, new ObjectSerializer().objectToByteArray(input));
            context.publish(TOPIC_4, new ObjectSerializer().objectToByteArray(input));

        } 

      
        return null;
    }


    public static void main(String[] args) throws Exception {
        List&lt;String&gt; arrlist = new ArrayList&lt;String&gt;();
        Vector&lt;String&gt; v = new Vector&lt;String&gt;();
        v.add("inputTopic1");
        v.add("inputTopic2");
        Enumeration&lt;String&gt; e = v.elements();
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setName("Awesome");
//        functionConfig.setInputs(Collections.list(e));
        functionConfig.setClassName(VirtualTopicMultiple.class.getName());
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setOutput("output");
        functionConfig.setTopicsPattern("<persistent://public/default/Test.*>");
//        functionConfig.set

        functionConfig.setLogTopic("logger");

        LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).brokerServiceUrl("<pulsar://localhost:6650>").build();
        localRunner.start(true);
    }
}```

----
2020-04-02 17:36:22 UTC - Matteo Merli: `public Void process(byte[] input, Context context)`
+1 : Konstantinos Papalias, Rattanjot Singh, David Kjerrumgaard
----
2020-04-02 17:40:12 UTC - Rattanjot Singh: Thanks
----
2020-04-02 17:48:37 UTC - Will F: @Will F has joined the channel
----
2020-04-02 17:49:11 UTC - David Kjerrumgaard: @Matteo Merli beat me to it  :smiley:
timer_clock : Matteo Merli
----
2020-04-03 08:12:45 UTC - ltamber: @ltamber has joined the channel
----
2020-04-03 08:19:19 UTC - Rattanjot Singh: Hi!
```functionConfig.setTopicsPattern("<persistent://public/default/*.Queue>");
Now would this function would be applied to a topic named Test.Queue ?```

----
2020-04-03 08:49:31 UTC - rmen: @rmen has joined the channel
----