You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/12 10:19:45 UTC

[GitHub] [pulsar-client-reactive] lhotari opened a new pull request, #112: Add sendManyCorrelated to ReactiveMessageSender

lhotari opened a new pull request, #112:
URL: https://github.com/apache/pulsar-client-reactive/pull/112

   Fixes #22


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] lhotari commented on a diff in pull request #112: Add sendManyCorrelated to ReactiveMessageSender

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #112:
URL: https://github.com/apache/pulsar-client-reactive/pull/112#discussion_r1087488974


##########
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/CorrelatedMessageSendingException.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.api;
+
+import org.reactivestreams.Publisher;
+
+/**
+ * A wrapper exception used by {@link ReactiveMessageSender#sendMany(Publisher)} and
+ * {@link ReactiveMessageSender#sendManyCorrelated(Publisher)}. The
+ * {@link #getCorrelationKey(Class)} method will return the correlation information to
+ * find out which message failed to be sent.
+ */
+public class CorrelatedMessageSendingException extends RuntimeException {
+
+	private final Object correlationKey;
+
+	public CorrelatedMessageSendingException(Throwable cause, Object correlationKey) {
+		super(cause);
+		this.correlationKey = correlationKey;
+	}
+
+	@Override
+	public Throwable fillInStackTrace() {
+		return this;
+	}
+
+	public <T> T getCorrelationKey(Class<T> correlationKeyType) {

Review Comment:
   
   returning `Object` is very uncommon on modern Java APIs.  like the way how Reactor solves the ugliness of needing to cast in code.  In Reactor, there's [Context](https://projectreactor.io/docs/core/release/api/reactor/util/context/Context.html) and [ContextView](https://projectreactor.io/docs/core/release/api/reactor/util/context/ContextView.html) . Please check ContextView for examples.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] lhotari closed pull request #112: Add sendManyCorrelated to ReactiveMessageSender

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari closed pull request #112: Add sendManyCorrelated to ReactiveMessageSender
URL: https://github.com/apache/pulsar-client-reactive/pull/112


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] lhotari commented on a diff in pull request #112: Add sendManyCorrelated to ReactiveMessageSender

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #112:
URL: https://github.com/apache/pulsar-client-reactive/pull/112#discussion_r1086430674


##########
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java:
##########
@@ -40,11 +41,30 @@
 
 	/**
 	 * Send multiple messages and get the associated message ids in the same order as the
-	 * messages sent.
+	 * messages sent. A send error will terminate the returned Flux with an error that is
+	 * wrapped in a {@link CorrelatedMessageSendingException} where the
+	 * {@link CorrelatedMessageSendingException#getCorrelationKey(Class)} method will
+	 * return the MessageSpec sent as input.
 	 * @param messageSpecs the specs of the messages to send
 	 * @return a publisher that will emit a message id per message successfully sent in
 	 * the order that they have been sent
 	 */
 	Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs);
 
+	/**
+	 * Send multiple messages and correlate the resulted message ids to a provided
+	 * correlation key. The correlation key can be any type of object. A send error will
+	 * terminate the returned Flux with an error that is wrapped in a
+	 * {@link CorrelatedMessageSendingException} where the
+	 * {@link CorrelatedMessageSendingException#getCorrelationKey(Class)} method will
+	 * return the correlation key sent as input.
+	 * @param tuplesOfCorrelationKeyAndMessageSpec a publisher where the element is a
+	 * tuple of the correlation key and the specs of the message to send
+	 * @param <K> type of correlation key
+	 * @return a publisher that will emit a tuple of the provided correlation key and the
+	 * message id for each message successfully sent
+	 */
+	<K> Flux<Tuple2<K, MessageId>> sendManyCorrelated(

Review Comment:
   I think that having the concept of correlation as part of the method name makes it easier to understand the purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] lhotari commented on a diff in pull request #112: Add sendManyCorrelated to ReactiveMessageSender

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #112:
URL: https://github.com/apache/pulsar-client-reactive/pull/112#discussion_r1087488974


##########
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/CorrelatedMessageSendingException.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.api;
+
+import org.reactivestreams.Publisher;
+
+/**
+ * A wrapper exception used by {@link ReactiveMessageSender#sendMany(Publisher)} and
+ * {@link ReactiveMessageSender#sendManyCorrelated(Publisher)}. The
+ * {@link #getCorrelationKey(Class)} method will return the correlation information to
+ * find out which message failed to be sent.
+ */
+public class CorrelatedMessageSendingException extends RuntimeException {
+
+	private final Object correlationKey;
+
+	public CorrelatedMessageSendingException(Throwable cause, Object correlationKey) {
+		super(cause);
+		this.correlationKey = correlationKey;
+	}
+
+	@Override
+	public Throwable fillInStackTrace() {
+		return this;
+	}
+
+	public <T> T getCorrelationKey(Class<T> correlationKeyType) {

Review Comment:
   returning `Object` is very uncommon on modern Java APIs. I like the way how Reactor solves the ugliness of needing to cast in code.  In Reactor, there's [Context](https://projectreactor.io/docs/core/release/api/reactor/util/context/Context.html) and [ContextView](https://projectreactor.io/docs/core/release/api/reactor/util/context/ContextView.html) . Please check ContextView for examples.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] cbornet commented on a diff in pull request #112: Add sendManyCorrelated to ReactiveMessageSender

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #112:
URL: https://github.com/apache/pulsar-client-reactive/pull/112#discussion_r1047540636


##########
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java:
##########
@@ -40,11 +41,30 @@
 
 	/**
 	 * Send multiple messages and get the associated message ids in the same order as the
-	 * messages sent.
+	 * messages sent. A send error will terminate the returned Flux with an error that is
+	 * wrapped in a {@link CorrelatedMessageSendingException} where the
+	 * {@link CorrelatedMessageSendingException#getCorrelationKey(Class)} method will
+	 * return the MessageSpec sent as input.
 	 * @param messageSpecs the specs of the messages to send
 	 * @return a publisher that will emit a message id per message successfully sent in
 	 * the order that they have been sent
 	 */
 	Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs);
 
+	/**
+	 * Send multiple messages and correlate the resulted message ids to a provided
+	 * correlation key. The correlation key can be any type of object. A send error will
+	 * terminate the returned Flux with an error that is wrapped in a
+	 * {@link CorrelatedMessageSendingException} where the
+	 * {@link CorrelatedMessageSendingException#getCorrelationKey(Class)} method will
+	 * return the correlation key sent as input.
+	 * @param tuplesOfCorrelationKeyAndMessageSpec a publisher where the element is a
+	 * tuple of the correlation key and the specs of the message to send
+	 * @param <K> type of correlation key
+	 * @return a publisher that will emit a tuple of the provided correlation key and the
+	 * message id for each message successfully sent
+	 */
+	<K> Flux<Tuple2<K, MessageId>> sendManyCorrelated(

Review Comment:
   Do we need to call it `sendManyCorrelated ` or could we call it `sendMany` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] lhotari commented on a diff in pull request #112: Add sendManyCorrelated to ReactiveMessageSender

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #112:
URL: https://github.com/apache/pulsar-client-reactive/pull/112#discussion_r1087490034


##########
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/CorrelatedMessageSendingException.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.api;
+
+import org.reactivestreams.Publisher;
+
+/**
+ * A wrapper exception used by {@link ReactiveMessageSender#sendMany(Publisher)} and
+ * {@link ReactiveMessageSender#sendManyCorrelated(Publisher)}. The
+ * {@link #getCorrelationKey(Class)} method will return the correlation information to
+ * find out which message failed to be sent.
+ */
+public class CorrelatedMessageSendingException extends RuntimeException {
+
+	private final Object correlationKey;
+
+	public CorrelatedMessageSendingException(Throwable cause, Object correlationKey) {
+		super(cause);
+		this.correlationKey = correlationKey;
+	}
+
+	@Override
+	public Throwable fillInStackTrace() {
+		return this;
+	}
+
+	public <T> T getCorrelationKey(Class<T> correlationKeyType) {

Review Comment:
   In this case, the need to pass the `Class` might be extra overhead and the signature could be simply `<T> T getCorrelationKey();`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] Plawn commented on pull request #112: Add sendManyCorrelated to ReactiveMessageSender

Posted by GitBox <gi...@apache.org>.
Plawn commented on PR #112:
URL: https://github.com/apache/pulsar-client-reactive/pull/112#issuecomment-1380467162

   Don't forget this PR :) pls 
   It's what I currently need 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] lhotari commented on pull request #112: Add sendManyCorrelated to ReactiveMessageSender

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on PR #112:
URL: https://github.com/apache/pulsar-client-reactive/pull/112#issuecomment-1405233924

   replaced by #115 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] cbornet commented on a diff in pull request #112: Add sendManyCorrelated to ReactiveMessageSender

Posted by "cbornet (via GitHub)" <gi...@apache.org>.
cbornet commented on code in PR #112:
URL: https://github.com/apache/pulsar-client-reactive/pull/112#discussion_r1086914181


##########
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/CorrelatedMessageSendingException.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.api;
+
+import org.reactivestreams.Publisher;
+
+/**
+ * A wrapper exception used by {@link ReactiveMessageSender#sendMany(Publisher)} and
+ * {@link ReactiveMessageSender#sendManyCorrelated(Publisher)}. The
+ * {@link #getCorrelationKey(Class)} method will return the correlation information to
+ * find out which message failed to be sent.
+ */
+public class CorrelatedMessageSendingException extends RuntimeException {
+
+	private final Object correlationKey;
+
+	public CorrelatedMessageSendingException(Throwable cause, Object correlationKey) {
+		super(cause);
+		this.correlationKey = correlationKey;
+	}
+
+	@Override
+	public Throwable fillInStackTrace() {
+		return this;
+	}
+
+	public <T> T getCorrelationKey(Class<T> correlationKeyType) {

Review Comment:
   Is there a big value to have to provide the class here instead of returning Object and letting the user do the cast on the client side (which might even not be needed depending on the use case)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org