You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@streampipes.apache.org by "RobertIndie (via GitHub)" <gi...@apache.org> on 2023/05/16 15:19:11 UTC

[PR] Support pulsar messasging layer (streampipes)

RobertIndie opened a new pull request, #1576:
URL: https://github.com/apache/streampipes/pull/1576

   <!--
     ~ Licensed to the Apache Software Foundation (ASF) under one or more
     ~ contributor license agreements.  See the NOTICE file distributed with
     ~ this work for additional information regarding copyright ownership.
     ~ The ASF licenses this file to You under the Apache License, Version 2.0
     ~ (the "License"); you may not use this file except in compliance with
     ~ the License.  You may obtain a copy of the License at
     ~
     ~    http://www.apache.org/licenses/LICENSE-2.0
     ~
     ~ Unless required by applicable law or agreed to in writing, software
     ~ distributed under the License is distributed on an "AS IS" BASIS,
     ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     ~ See the License for the specific language governing permissions and
     ~ limitations under the License.
     ~
     -->
     
     <!--
   Thanks for contributing! Here are some tips you can follow to help us incorporate your contribution quickly and easily:
   1. If this is your first time, please read our contributor guidelines:
       - https://streampipes.apache.org/getinvolved.html
       - https://cwiki.apache.org/confluence/display/STREAMPIPES/Getting+Started
   2. Make sure the PR title is formatted like: `[#<GitHub issue id>] PR title ...`
   3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., `[WIP][#<GitHub issue id>] PR title ...`.
   4. Please write your PR title to summarize what this PR proposes/fixes.
   5. Link the PR to the corresponding GitHub issue (if present) in the `Development` section in the right menu bar. 
   6. Be sure to keep the PR description updated to reflect all changes.
   7. If possible, provide a concise example to reproduce the issue for a faster review.
   8. Make sure tests pass via `mvn clean install`.
   9. (Optional) If the contribution is large, please file an Apache ICLA
       - http://apache.org/licenses/icla.pdf
   -->
   
   ### Purpose
   Add pulsar messaging layer support
   
   ### Remarks
   <!--
   Is there anything left we need to pay attention on?
   Are there some references that might be important? E.g. links to Confluence, or discussions
   on the mailing list or GitHub.
   -->
   PR introduces (a) breaking change(s): <yes/no>
   
   PR introduces (a) deprecation(s): <yes/no>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "tenthe (via GitHub)" <gi...@apache.org>.
tenthe commented on code in PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#discussion_r1257724193


##########
streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarConsumer.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.messaging.pulsar;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.EventConsumer;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.grounding.PulsarTransportProtocol;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class PulsarConsumer implements EventConsumer {
+
+  private PulsarClient pulsarClient;
+  private Consumer<byte[]> consumer;
+  private PulsarTransportProtocol protocolSettings;
+
+  public PulsarConsumer(PulsarTransportProtocol protocolSettings) {
+    this.protocolSettings = protocolSettings;
+  }
+
+  @Override
+  public void connect(InternalEventProcessor<byte[]> eventProcessor) throws SpRuntimeException {
+    try {
+      String serviceURL = "";
+      if (!protocolSettings.getBrokerHostname().startsWith("pulsar://")) {
+        // TODO: Add support for 'pulsar+ssl://'

Review Comment:
   Is it ok if we remove this TODO and create a issue for it instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "RobertIndie (via GitHub)" <gi...@apache.org>.
RobertIndie commented on PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#issuecomment-1602570245

   I'm currently working on the other task and will return back to this PR when I have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "RobertIndie (via GitHub)" <gi...@apache.org>.
RobertIndie commented on PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#issuecomment-1551658962

   @tenthe Tha's very useful suggestions. Thanks! I'm currently fixing some problems. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "RobertIndie (via GitHub)" <gi...@apache.org>.
RobertIndie commented on PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#issuecomment-1628929665

   @tenthe @dominikriemer Thanks for your review. Please give a last eye on it. And feel free to merge it if you are all OK with this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "RobertIndie (via GitHub)" <gi...@apache.org>.
RobertIndie merged PR #1576:
URL: https://github.com/apache/streampipes/pull/1576


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "dominikriemer (via GitHub)" <gi...@apache.org>.
dominikriemer commented on code in PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#discussion_r1257523426


##########
installer/cli/environments/backend-pulsar:
##########
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+[environment:backend-pulsar]
+consul
+couchdb
+influxdb
+ui-custom-conf

Review Comment:
   I think we can add `pulsar` here



##########
installer/cli/environments/pipeline-element-pulsar:
##########
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+[environment:pipeline-element-pulsar]
+backend-pulsar
+consul
+couchdb
+ui
+influxdb
+# TODO: Need to add pulsar docker compose

Review Comment:
   Can we remove this TODO and add `pulsar` as a service?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "RobertIndie (via GitHub)" <gi...@apache.org>.
RobertIndie commented on code in PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#discussion_r1258232032


##########
streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarConsumer.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.messaging.pulsar;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.EventConsumer;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.grounding.PulsarTransportProtocol;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class PulsarConsumer implements EventConsumer {
+
+  private PulsarClient pulsarClient;
+  private Consumer<byte[]> consumer;
+  private PulsarTransportProtocol protocolSettings;
+
+  public PulsarConsumer(PulsarTransportProtocol protocolSettings) {
+    this.protocolSettings = protocolSettings;
+  }
+
+  @Override
+  public void connect(InternalEventProcessor<byte[]> eventProcessor) throws SpRuntimeException {
+    try {
+      String serviceURL = "";
+      if (!protocolSettings.getBrokerHostname().startsWith("pulsar://")) {
+        // TODO: Add support for 'pulsar+ssl://'

Review Comment:
   I created an issue to track it: https://github.com/apache/streampipes/issues/1756



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "RobertIndie (via GitHub)" <gi...@apache.org>.
RobertIndie commented on PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#issuecomment-1627374513

   Thanks @dominikriemer .
   All tests are passed now!
   <img width="890" alt="image" src="https://github.com/apache/streampipes/assets/16974619/2cc1edd4-ab4e-44af-a88d-9e5f4ea514cd">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "RobertIndie (via GitHub)" <gi...@apache.org>.
RobertIndie commented on PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#issuecomment-1631060269

   I'm going to merge this PR. Thanks for all your support and review! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#issuecomment-1601894564

   Hello there :wave:<br> <br> Unfortunately, we didn't hear back from you regarding your pull request, so we're closing it now. Don't worry, you can always reopen the PR at any time if you wish to continue working on it :raised_hands:.<br> <br> Please note that the branch associated with this pull request will not be deleted, so you can still access your changes and continue to work on them as needed :computer:.<br> <br> Thank you for your contributions to our project, and we hope to see you again soon!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #1576: Support pulsar messasging layer
URL: https://github.com/apache/streampipes/pull/1576


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "RobertIndie (via GitHub)" <gi...@apache.org>.
RobertIndie commented on code in PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#discussion_r1258235325


##########
streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarProducer.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.messaging.pulsar;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.model.grounding.PulsarTransportProtocol;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import java.io.Serializable;
+
+public class PulsarProducer implements EventProducer, Serializable {
+
+  private PulsarClient pulsarClient;
+  private Producer<byte[]> producer;
+  private PulsarTransportProtocol protocolSettings;
+
+  public PulsarProducer(PulsarTransportProtocol protocolSettings) {
+    this.protocolSettings = protocolSettings;
+  }
+
+  @Override
+  public void connect() throws SpRuntimeException {
+    try {
+      // TODO: the pulsarClient may need to move to PulsarTransportProtocol

Review Comment:
   Here is the issue to track it: https://github.com/apache/streampipes/issues/1757



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "tenthe (via GitHub)" <gi...@apache.org>.
tenthe commented on PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#issuecomment-1551598844

   Hey @RobertIndie,
   that looks good already. To check if everything works, you can run the e2e tests locally.
   You can start StreamPipes (configured with Pulsar) and run `npm run test-cypress-smoke`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "dominikriemer (via GitHub)" <gi...@apache.org>.
dominikriemer commented on PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#issuecomment-1625763770

   Hi @RobertIndie I think I've fixed the Kafka smoke test by changing the input type for the topic selection to 'checkbox'. My guess is that the test failed every time when the returned topic list only contained a single topic, so good catch ;-)
   
   I tested the Pulsar setup by adding the official docker-compose file from Pulsar to the CLI and two new environments, which worked without problems. Maybe you can have a look at the docker-compose configuration and see if it needs any changes.
   
   My changes are in https://github.com/apache/streampipes/tree/add-pulsar
   
   Great work!
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "RobertIndie (via GitHub)" <gi...@apache.org>.
RobertIndie commented on PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#issuecomment-1621963267

   When I run `npm run test-cypress-smoke`, all tests are passed expects for the `cypress/tests/thirdparty/Kafka.smoke.spec.ts`:
   ```
    1) Test Kafka Integration
          Perform Test:
        CypressError: `cy.type()` failed because it requires a valid typeable element.
   
   The element typed into was:
   
     > `<label _ngcontent-bbr-c328="" style="font-weight: normal;">cypress...</label>`
   
     A typeable element matches one of the following selectors:
     `a[href]`
     `area[href]`
     `input`
     `select`
     `textarea`
     `button`
     `iframe`
     `[tabindex]`
     `[contenteditable]`
   
   https://on.cypress.io/type
         at <unknown> (http://localhost/__cypress/runner/cypress_runner.js:134708:84)
         at tryCatcher (http://localhost/__cypress/runner/cypress_runner.js:8914:23)
         at Promise._settlePromiseFromHandler (http://localhost/__cypress/runner/cypress_runner.js:6849:31)
         at Promise._settlePromise (http://localhost/__cypress/runner/cypress_runner.js:6906:18)
         at Promise._settlePromise0 (http://localhost/__cypress/runner/cypress_runner.js:6951:10)
         at Promise._settlePromises (http://localhost/__cypress/runner/cypress_runner.js:7031:18)
         at _drainQueueStep (http://localhost/__cypress/runner/cypress_runner.js:3621:12)
         at _drainQueue (http://localhost/__cypress/runner/cypress_runner.js:3614:9)
         at ../../node_modules/bluebird/js/release/async.js.Async._drainQueues (http://localhost/__cypress/runner/cypress_runner.js:3630:5)
         at Async.drainQueues (http://localhost/__cypress/runner/cypress_runner.js:3500:14)
     From Your Spec Code:
         at eval (webpack://apache-streampipes/./cypress/support/utils/StaticPropertyUtils.ts:51:43)
     at Array.forEach (<anonymous>)
         at StaticPropertyUtils.input (webpack://apache-streampipes/./cypress/support/utils/StaticPropertyUtils.ts:24:16)
         at ConnectUtils.configureAdapter (webpack://apache-streampipes/./cypress/support/utils/connect/ConnectUtils.ts:119:28)
         at ConnectUtils.testAdapter (webpack://apache-streampipes/./cypress/support/utils/connect/ConnectUtils.ts:34:21)
         at ThirdPartyIntegrationUtils.runTest (webpack://apache-streampipes/./cypress/support/utils/ThirdPartyIntegrationUtils.ts:44:21)
         at Context.eval (webpack://apache-streampipes/./cypress/tests/thirdparty/Kafka.smoke.spec.ts:64:35)
   ```
   
   Here is the screenshots:
   ![Test Kafka Integration -- Perform Test (failed)](https://github.com/apache/streampipes/assets/16974619/481b5f6c-548a-4980-8cbb-a745efdc1285)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "dominikriemer (via GitHub)" <gi...@apache.org>.
dominikriemer commented on PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#issuecomment-1622395699

   Hi @RobertIndie awesome to have Pulsar supported 😄 
   I had similar cypress problems in the other branch where I upgraded Angular, I'll have a look tomorrow and see if we can port the change to your branch!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#issuecomment-1590300257

   Hello there :wave:<br> <br> We noticed that it's been some time since activity occurred on your pull request :thinking:. In order to keep things moving forward, we're marking this PR as _stale_ and giving you 7 days to respond before it's automatically closed :alarm_clock:.<br> <br> Please take a moment to review your pull request and make any necessary updates or changes :man_technologist:. If you need more time or have any questions, please don't hesitate to let us know :speech_balloon:.<br> <br> Thank you for your contributions to our project, and we look forward to hearing back from you soon :pray:.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support pulsar messasging layer (streampipes)

Posted by "tenthe (via GitHub)" <gi...@apache.org>.
tenthe commented on code in PR #1576:
URL: https://github.com/apache/streampipes/pull/1576#discussion_r1257724377


##########
streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarProducer.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.messaging.pulsar;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.model.grounding.PulsarTransportProtocol;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import java.io.Serializable;
+
+public class PulsarProducer implements EventProducer, Serializable {
+
+  private PulsarClient pulsarClient;
+  private Producer<byte[]> producer;
+  private PulsarTransportProtocol protocolSettings;
+
+  public PulsarProducer(PulsarTransportProtocol protocolSettings) {
+    this.protocolSettings = protocolSettings;
+  }
+
+  @Override
+  public void connect() throws SpRuntimeException {
+    try {
+      // TODO: the pulsarClient may need to move to PulsarTransportProtocol

Review Comment:
   Same for this



##########
streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarConsumer.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.messaging.pulsar;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.EventConsumer;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.grounding.PulsarTransportProtocol;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class PulsarConsumer implements EventConsumer {
+
+  private PulsarClient pulsarClient;
+  private Consumer<byte[]> consumer;
+  private PulsarTransportProtocol protocolSettings;
+
+  public PulsarConsumer(PulsarTransportProtocol protocolSettings) {
+    this.protocolSettings = protocolSettings;
+  }
+
+  @Override
+  public void connect(InternalEventProcessor<byte[]> eventProcessor) throws SpRuntimeException {
+    try {
+      String serviceURL = "";
+      if (!protocolSettings.getBrokerHostname().startsWith("pulsar://")) {
+        // TODO: Add support for 'pulsar+ssl://'

Review Comment:
   Is it ok if we remove this TODO and create a topic for it instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org