You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2019/05/15 21:23:13 UTC

[GitHub] [samza] shanthoosh commented on a change in pull request #1030: SAMZA-2192: Add StartpointVisitor implementation for EventHub.

shanthoosh commented on a change in pull request #1030: SAMZA-2192: Add StartpointVisitor implementation for EventHub.
URL: https://github.com/apache/samza/pull/1030#discussion_r284455548
 
 

 ##########
 File path: samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
 ##########
 @@ -205,4 +234,75 @@ public static Integer compareOffsets(String offset1, String offset2) {
   public Integer offsetComparator(String offset1, String offset2) {
     return compareOffsets(offset1, offset2);
   }
+
+  /**
+   * Offers a eventhub specific implementation of {@link StartpointVisitor} that resolves
+   * different types of {@link Startpoint} to samza offset.
+   */
+  @VisibleForTesting
+  static class EventHubSamzaOffsetResolver implements StartpointVisitor<SystemStreamPartition, String> {
+
+    private final EventHubSystemAdmin eventHubSystemAdmin;
+    private final EventHubConfig eventHubConfig;
+
+    EventHubSamzaOffsetResolver(EventHubSystemAdmin eventHubSystemAdmin, EventHubConfig eventHubConfig) {
+      this.eventHubSystemAdmin = eventHubSystemAdmin;
+      this.eventHubConfig = eventHubConfig;
+    }
+
+    @Override
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
+      return startpointSpecific.getSpecificOffset();
+    }
+
+    @Override
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+      String streamName = systemStreamPartition.getStream();
+      EventHubClientManager eventHubClientManager = eventHubSystemAdmin.getOrCreateStreamEventHubClient(streamName);
+      EventHubClient eventHubClient = eventHubClientManager.getEventHubClient();
+
+      PartitionReceiver partitionReceiver = null;
+      try {
+        // 1. Initialize the arguments required for creating the partition receiver.
+        String partitionId = String.valueOf(systemStreamPartition.getPartition().getPartitionId());
+        Instant epochInMillisInstant = Instant.ofEpochMilli(startpointTimestamp.getTimestampOffset());
+        EventPosition eventPosition = EventPosition.fromEnqueuedTime(epochInMillisInstant);
+        String consumerGroup = eventHubConfig.getStreamConsumerGroup(systemStreamPartition.getSystem(), streamName);
+
+        // 2. Create a partition receiver with event position defined by the timestamp.
+        partitionReceiver = eventHubClient.createReceiverSync(consumerGroup, partitionId, eventPosition);
+
+        // 3. Read a single message from the partition receiver.
+        Iterable<EventData> eventHubMessagesIterator = partitionReceiver.receiveSync(1);
+        ArrayList<EventData> eventHubMessageList = Lists.newArrayList(eventHubMessagesIterator);
+
+        // 4. Validate that a single message was fetched from the broker.
+        Preconditions.checkState(eventHubMessageList.size() == 1, "Failed to read messages from EventHub system.");
 
 Review comment:
   1. My initial idea was to fall back to either oldest or newest offset upon startpoint resolution failure. However, depending upon the timestamp that the user wants to seek to for a partition doing either of them could be incorrect. Let's say user  wants to seek to a timestamp closer to beginning-offset and offset for it doesn't exist. If we default it to newest, then it will be diametrically opposite to what the user desires(or vice-versa).
   2. In case of startpoint resolution failure, we fall back to the checkpointed-offset in `OffsetManager` essentially ignoring the startpoint. 
   3. Ideal solution should allow the user to define the default.offset policy in case of startpoint resolution failures and use it in the `OffsetManager`. But doing it before hand, until we have concrete use-cases or asks might be unfruitful. 
   
   What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services