You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/23 22:17:30 UTC

[GitHub] [pulsar] nlu90 opened a new pull request #11056: [pulsar-io] pass pulsar client via context to connector

nlu90 opened a new pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056


   <!--
   ### Contribution Checklist
     
     - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
       Skip *Issue XYZ* if there is no associated github issue for this pull request.
       Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   *(If this PR fixes a github issue, please add `Fixes #<xyz>`.)*
   
   Fixes #<xyz>
   
   *(or if this PR is one task of a github issue, please add `Master Issue: #<xyz>` to link to the master issue.)*
   
   Master Issue: #<xyz>
   
   ### Motivation
   
   solve #8668
   
   ### Modifications
   
   Expose `PulsarClient` via `BaseContext`, and let connectors use the inherited pulsar client from function worker
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
     - The public API: `SourceContext` and `SinkContext` need to implement the `getPulsarClient` method
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-870087560






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-868697989


   @eolivelli the auth data the pulsar client that is instantiated in a pulsar instance has should be properly scoped.  Usually it is inherits the credentials of the the source/sink/function submitter.  Thus, the instance will only be able to perform the operations that it is authorized to perform.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie merged pull request #11056: PIP-85: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
sijie merged pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-871945639


   for reference
   https://github.com/apache/pulsar/wiki/PIP-85:-Expose-Pulsar-Client-via-Function-Connector-BaseContext


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#discussion_r658212479



##########
File path: pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
##########
@@ -167,55 +177,53 @@ public void seekPauseResumeTest() throws Exception {
         assertEquals(MessageIdUtils.getOffset(msgId), sink.currentOffset(tp.topic(), tp.partition()));
 
         sink.taskContext.offset(tp, 0);
-        verify(mockCtx, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), any());
+        verify(context, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), any());
         assertEquals(0, sink.currentOffset(tp.topic(), tp.partition()));
 
         sink.taskContext.pause(tp);
-        verify(mockCtx, times(1)).pause(tp.topic(), tp.partition());
+        verify(context, times(1)).pause(tp.topic(), tp.partition());
         sink.taskContext.resume(tp);
-        verify(mockCtx, times(1)).resume(tp.topic(), tp.partition());
+        verify(context, times(1)).resume(tp.topic(), tp.partition());
 
         sink.close();
     }
 
 
     @Test
     public void subscriptionTypeTest() throws Exception {
-        SinkContext mockCtx = Mockito.mock(SinkContext.class);
-        when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
         try (KafkaConnectSink sink = new KafkaConnectSink()) {
-            log.info("Exclusive is allowed");
-            sink.open(props, mockCtx);
+            log.info("Failover is allowed");

Review comment:
       Why are you changing this test?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 commented on pull request #11056: PIP-85: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
nlu90 commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-873119126


   /pulsarbot run-failure-checks
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
nlu90 commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-869996839


   Hi, seems the CI check failed for some weird reason. 
   
   Can anyone re-run all the test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#discussion_r659501139



##########
File path: pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
##########
@@ -167,55 +177,53 @@ public void seekPauseResumeTest() throws Exception {
         assertEquals(MessageIdUtils.getOffset(msgId), sink.currentOffset(tp.topic(), tp.partition()));
 
         sink.taskContext.offset(tp, 0);
-        verify(mockCtx, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), any());
+        verify(context, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), any());
         assertEquals(0, sink.currentOffset(tp.topic(), tp.partition()));
 
         sink.taskContext.pause(tp);
-        verify(mockCtx, times(1)).pause(tp.topic(), tp.partition());
+        verify(context, times(1)).pause(tp.topic(), tp.partition());
         sink.taskContext.resume(tp);
-        verify(mockCtx, times(1)).resume(tp.topic(), tp.partition());
+        verify(context, times(1)).resume(tp.topic(), tp.partition());
 
         sink.close();
     }
 
 
     @Test
     public void subscriptionTypeTest() throws Exception {
-        SinkContext mockCtx = Mockito.mock(SinkContext.class);
-        when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
         try (KafkaConnectSink sink = new KafkaConnectSink()) {
-            log.info("Exclusive is allowed");
-            sink.open(props, mockCtx);
+            log.info("Failover is allowed");

Review comment:
       makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-870109456


   > So this kind of changes in my humble option MUST be discussed properly on the dev@ mailing list.
   
   Agree on having a PIP. but it doesn't mean that we shouldn't include the Pulsar client API. Class loading is a different problem that we need to solve. We don't necessarily need to couple different issues into one discussion.
   
   @nlu90 Can you send out a PIP to dev@ mailing list for this new API?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
nlu90 commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-871686617


   > > So this kind of changes in my humble option MUST be discussed properly on the dev@ mailing list.
   > 
   > Agree on having a PIP. but it doesn't mean that we shouldn't include the Pulsar client API. Class loading is a different problem that we need to solve. We don't necessarily need to couple different issues into one discussion.
   > 
   > @nlu90 Can you send out a PIP to dev@ mailing list for this new API?
   
   yeah, I just sent the PIP to dev mailing list. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
nlu90 commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-869996839


   Hi, seems the CI check failed for some weird reason. 
   
   Can anyone re-run all the test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 edited a comment on pull request #11056: PIP-85: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
nlu90 edited a comment on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-872766181


   The test failed with the following error, which I think its not related to the PR chagnes.
   ```
   Error:  Failures: 
   Error:  org.apache.pulsar.client.api.TokenAuthenticatedProducerConsumerTest.testTokenProducerAndConsumer(org.apache.pulsar.client.api.TokenAuthenticatedProducerConsumerTest)
   [INFO]   Run 1: PASS
   Error:    Run 2: TokenAuthenticatedProducerConsumerTest.testTokenProducerAndConsumer » NotAuthorized
   ```
   
   I'll rebase the changes and try again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-870245034


   > > So this kind of changes in my humble option MUST be discussed properly on the dev@ mailing list.
   > 
   > Agree on having a PIP. but it doesn't mean that we shouldn't include the Pulsar client API. Class loading is a different problem that we need to solve. We don't necessarily need to couple different issues into one discussion.
   
   Agreed. Thanks
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-871762854


   @eolivelli Can you please review and unblock this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
dlg99 commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-869202066


   @sijie see https://github.com/apache/pulsar/issues/11099 regarding
   > it is already being tested as part of debezium connector integration tests. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 commented on pull request #11056: PIP-85: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
nlu90 commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-872754565


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
dlg99 commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-867912926


   also it broke the tests
   ```
   Error:  Tests run: 37, Failures: 1, Errors: 0, Skipped: 27, Time elapsed: 79.186 s <<< FAILURE! - in org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest
   Error:  offsetTest(org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest)  Time elapsed: 2.084 s  <<< FAILURE!
   java.lang.RuntimeException: Failed to create pulsar client to cluster at http://localhost:43199
   	at org.apache.pulsar.io.kafka.connect.PulsarOffsetBackingStore.start(PulsarOffsetBackingStore.java:159)
   	at org.apache.pulsar.io.kafka.connect.PulsarKafkaSinkTaskContext.<init>(PulsarKafkaSinkTaskContext.java:73)
   	at org.apache.pulsar.io.kafka.connect.KafkaConnectSink.open(KafkaConnectSink.java:163)
   	at org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest.offsetTest(KafkaConnectSinkTest.java:439)
   ...
   Caused by: org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: Client already closed : state = Closed
   	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:975)
   	at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:95)
   	at org.apache.pulsar.io.kafka.connect.PulsarOffsetBackingStore.start(PulsarOffsetBackingStore.java:147)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] freeznet commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
freeznet commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-870259714


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-869390714


   @jerrypeng thanks for your clarification. 
   
   @sijie 
   I am totally fine with adding this feature, and also I am very happy to see the Debezium problem fixed.
   
   But my point is that here we are adding again a new powerful API to the Pulsar Functions/IO system.
   **So this kind of changes in my humble option MUST be discussed properly on the dev@ mailing list.**
   This way the project/community will be aware of what's going on and have the chance to participate into the discussion
   for a new API.
   A PR is not good for such high level discussions, because not everyone is following the huge quantity of messages due to PR.
   
   We said many times during the past months (cc @merlimat) that we should use better the tool of the PIPs and we should add new APIs more carefully, being sure that the community is aware of what is happening. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 commented on pull request #11056: PIP-85: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
nlu90 commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-872766181


   The test failed with the following error, which I think its not related to the PR chagnes.
   ```
   Error:  Failures: 
   Error:  org.apache.pulsar.client.api.TokenAuthenticatedProducerConsumerTest.testTokenProducerAndConsumer(org.apache.pulsar.client.api.TokenAuthenticatedProducerConsumerTest)
   [INFO]   Run 1: PASS
   Error:    Run 2: TokenAuthenticatedProducerConsumerTest.testTokenProducerAndConsumer » NotAuthorized
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] freeznet commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
freeznet commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-870259714


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 removed a comment on pull request #11056: PIP-85: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
nlu90 removed a comment on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-873119126


   /pulsarbot run-failure-checks
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-870109456


   > So this kind of changes in my humble option MUST be discussed properly on the dev@ mailing list.
   
   Agree on having a PIP. but it doesn't mean that we shouldn't include the Pulsar client API. Class loading is a different problem that we need to solve. We don't necessarily need to couple different issues into one discussion.
   
   @nlu90 Can you send out a PIP to dev@ mailing list for this new API?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-869392014


   @sijie the "classpath problem" is not solved.
   Here it is @merlimat 's patch, that is still not committed.
   https://github.com/apache/pulsar/pull/10922
   
   With this change we are going to add more access to the Pulsar API, but not every API that is needed to leverage the Pulsar client is working.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#discussion_r658230332



##########
File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
##########
@@ -136,9 +141,6 @@ void processMessage(Message<byte[]> message) {
     @Override
     public void start() {
         try {
-            client = PulsarClient.builder()
-                .serviceUrl(serviceUrl)
-                .build();
             log.info("Successfully created pulsar client to {}", serviceUrl);

Review comment:
       this log line is irrelevant now

##########
File path: pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
##########
@@ -191,4 +192,11 @@
      * @param value The value of the metric
      */
     void recordMetric(String metricName, double value);
+
+    /**
+     * Get the pulsar client.
+     *
+     * @return the instance of pulsar client
+     */
+    PulsarClient getPulsarClient();

Review comment:
       add default implementation throwing NotImpelmentedException?

##########
File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
##########
@@ -136,9 +141,6 @@ void processMessage(Message<byte[]> message) {
     @Override
     public void start() {
         try {
-            client = PulsarClient.builder()
-                .serviceUrl(serviceUrl)
-                .build();
             log.info("Successfully created pulsar client to {}", serviceUrl);

Review comment:
       please remove `serviceUrl` - it is no longer needed and adjust the rest of logging accordingly.
   Can remove it from the config definitions as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-870087560


   /pulsarbot rerun-failure-checks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-868115125


   @eolivelli 
   
   > There are some security concerns because the client we be able to leverage all of the resources of the client.
   
   What are the security concerns? I think we have discussed that in one of the community meetings talking about the PulsarAdmin interface. The function uses the credentials/token to access the Pulsar resources. The credentials already limit the access on what resources that the function can access. I don't see why and how expose PulsarAdmin or PulsarClient can introduce security issues.
   
   Also we have to ensure that the API that we expose will work correctly.
   
   > This patch also touches the Kafka integration part, probably it is better to split the patch into two parts.
   
   The main reason for this change is to solve the problem of the Debezium connector. A Debezium connector is a wrapper over Kafka connector. I don't see why we need to split that into two parts.
   
   > we added new API like this one (see the Pulsar Admin API support) and it was a pain.
   > There are pending works about allowing to use of the Pulsar Client API inside functions and IO connectors because we have classpath problems.
   
   We have fixed the class loading problem. If we don't encounter class loading problems, I don't know why this would be a concern.
   
   > define clearly how functions and IO connectors must refer to the Pulsar API, for instance currently many connectors must bundle Pulsar client jars inside the nar file
   
   I don't see how is that related to this pull request. These are two different issues. Please don't couple them together.
   
   > add integration tests that cover the list of supported API in the client (do we have to fully support the client API?)
   
   This change is used for replacing the Pulsar client in the Pulsar offset store, which already points to the same Pulsar cluster.  So it is already being tested as part of debezium connector integration tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 commented on a change in pull request #11056: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
nlu90 commented on a change in pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#discussion_r658299807



##########
File path: pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
##########
@@ -167,55 +177,53 @@ public void seekPauseResumeTest() throws Exception {
         assertEquals(MessageIdUtils.getOffset(msgId), sink.currentOffset(tp.topic(), tp.partition()));
 
         sink.taskContext.offset(tp, 0);
-        verify(mockCtx, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), any());
+        verify(context, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), any());
         assertEquals(0, sink.currentOffset(tp.topic(), tp.partition()));
 
         sink.taskContext.pause(tp);
-        verify(mockCtx, times(1)).pause(tp.topic(), tp.partition());
+        verify(context, times(1)).pause(tp.topic(), tp.partition());
         sink.taskContext.resume(tp);
-        verify(mockCtx, times(1)).resume(tp.topic(), tp.partition());
+        verify(context, times(1)).resume(tp.topic(), tp.partition());
 
         sink.close();
     }
 
 
     @Test
     public void subscriptionTypeTest() throws Exception {
-        SinkContext mockCtx = Mockito.mock(SinkContext.class);
-        when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
         try (KafkaConnectSink sink = new KafkaConnectSink()) {
-            log.info("Exclusive is allowed");
-            sink.open(props, mockCtx);
+            log.info("Failover is allowed");

Review comment:
       I switched the order of the first two tests since by default the subscriptiontype is `failover`. The exclusive test is actually moved to line 199

##########
File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
##########
@@ -136,9 +141,6 @@ void processMessage(Message<byte[]> message) {
     @Override
     public void start() {
         try {
-            client = PulsarClient.builder()
-                .serviceUrl(serviceUrl)
-                .build();
             log.info("Successfully created pulsar client to {}", serviceUrl);

Review comment:
       Yeah, plan to remove the `serviceUrl` after the current unit test fix. Thanks for the reminding.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 commented on pull request #11056: PIP-85: [pulsar-io] pass pulsar client via context to connector

Posted by GitBox <gi...@apache.org>.
nlu90 commented on pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#issuecomment-873131755


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org