You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2019/05/15 04:37:15 UTC

[GitHub] [samza] dnishimura commented on a change in pull request #1030: SAMZA-2192: Add StartpointVisitor implementation for EventHub.

dnishimura commented on a change in pull request #1030: SAMZA-2192: Add StartpointVisitor implementation for EventHub.
URL: https://github.com/apache/samza/pull/1030#discussion_r284080935
 
 

 ##########
 File path: samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
 ##########
 @@ -205,4 +234,75 @@ public static Integer compareOffsets(String offset1, String offset2) {
   public Integer offsetComparator(String offset1, String offset2) {
     return compareOffsets(offset1, offset2);
   }
+
+  /**
+   * Offers a eventhub specific implementation of {@link StartpointVisitor} that resolves
+   * different types of {@link Startpoint} to samza offset.
+   */
+  @VisibleForTesting
+  static class EventHubSamzaOffsetResolver implements StartpointVisitor<SystemStreamPartition, String> {
+
+    private final EventHubSystemAdmin eventHubSystemAdmin;
+    private final EventHubConfig eventHubConfig;
+
+    EventHubSamzaOffsetResolver(EventHubSystemAdmin eventHubSystemAdmin, EventHubConfig eventHubConfig) {
+      this.eventHubSystemAdmin = eventHubSystemAdmin;
+      this.eventHubConfig = eventHubConfig;
+    }
+
+    @Override
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
+      return startpointSpecific.getSpecificOffset();
+    }
+
+    @Override
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+      String streamName = systemStreamPartition.getStream();
+      EventHubClientManager eventHubClientManager = eventHubSystemAdmin.getOrCreateStreamEventHubClient(streamName);
+      EventHubClient eventHubClient = eventHubClientManager.getEventHubClient();
+
+      PartitionReceiver partitionReceiver = null;
+      try {
+        // 1. Initialize the arguments required for creating the partition receiver.
+        String partitionId = String.valueOf(systemStreamPartition.getPartition().getPartitionId());
+        Instant epochInMillisInstant = Instant.ofEpochMilli(startpointTimestamp.getTimestampOffset());
+        EventPosition eventPosition = EventPosition.fromEnqueuedTime(epochInMillisInstant);
+        String consumerGroup = eventHubConfig.getStreamConsumerGroup(systemStreamPartition.getSystem(), streamName);
+
+        // 2. Create a partition receiver with event position defined by the timestamp.
+        partitionReceiver = eventHubClient.createReceiverSync(consumerGroup, partitionId, eventPosition);
+
+        // 3. Read a single message from the partition receiver.
+        Iterable<EventData> eventHubMessagesIterator = partitionReceiver.receiveSync(1);
+        ArrayList<EventData> eventHubMessageList = Lists.newArrayList(eventHubMessagesIterator);
+
+        // 4. Validate that a single message was fetched from the broker.
+        Preconditions.checkState(eventHubMessageList.size() == 1, "Failed to read messages from EventHub system.");
 
 Review comment:
   Does it make sense to fail if the partition is empty?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services