You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2023/06/14 14:48:53 UTC

[samza] branch master updated: - Fix parameter ReceiverOptions parameters usaged (#1670)

This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 03d70a5a1 - Fix parameter ReceiverOptions parameters usaged (#1670)
03d70a5a1 is described below

commit 03d70a5a19398b8087ab9e2de5319d027438e56f
Author: Eric Honer <eh...@linkedin.com>
AuthorDate: Wed Jun 14 07:48:46 2023 -0700

    - Fix parameter ReceiverOptions parameters usaged (#1670)
    
    - Change powermock parameters
---
 .../samza/system/eventhub/consumer/EventHubSystemConsumer.java   | 9 +++++----
 .../samza/system/eventhub/MockEventHubClientManagerFactory.java  | 2 +-
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 91eab33ee..6fe2c72da 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -283,9 +283,10 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
           // If no such offset exists Eventhub will return an error.
           ReceiverOptions receiverOptions = new ReceiverOptions();
           receiverOptions.setPrefetchCount(prefetchCount);
+          EventPosition position = EventPosition.fromOffset(offset, /* inclusiveFlag */false);
           receiver = eventHubClientManager.getEventHubClient()
-              .createReceiver(consumerGroup, partitionId.toString(),
-                  EventPosition.fromOffset(offset, /* inclusiveFlag */false)).get(DEFAULT_EVENTHUB_CREATE_RECEIVER_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+              .createReceiver(consumerGroup, partitionId.toString(), position, receiverOptions)
+              .get(DEFAULT_EVENTHUB_CREATE_RECEIVER_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
         }
 
         PartitionReceiveHandler handler =
@@ -378,9 +379,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
       // Recreate receiver
       ReceiverOptions receiverOptions = new ReceiverOptions();
       receiverOptions.setPrefetchCount(prefetchCount);
+      EventPosition position = EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM));
       PartitionReceiver receiver = eventHubClientManager.getEventHubClient()
-          .createReceiverSync(consumerGroup, partitionId.toString(),
-              EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM)), receiverOptions);
+          .createReceiverSync(consumerGroup, partitionId.toString(), position, receiverOptions);
 
       // Timeout for EventHubClient receive
       receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT);
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
index 2d071518c..240a1d519 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
@@ -138,7 +138,7 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto
                 return CompletableFuture.completedFuture(mockPartitionReceiver);
               });
 
-        PowerMockito.when(mockEventHubClient.createReceiver(anyString(), anyString(), anyObject()))
+        PowerMockito.when(mockEventHubClient.createReceiver(anyString(), anyString(), anyObject(), anyObject()))
               .then((Answer<CompletableFuture<PartitionReceiver>>) invocationOnMock -> {
                 String partitionId = invocationOnMock.getArgumentAt(1, String.class);
                 EventPosition offset = invocationOnMock.getArgumentAt(2, EventPosition.class);