You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/16 00:22:45 UTC

[samza] branch master updated: SAMZA-2192: Add StartpointVisitor implementation for EventHub. (#1030)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a589fc  SAMZA-2192: Add StartpointVisitor implementation for EventHub. (#1030)
2a589fc is described below

commit 2a589fcc8b13c0a90f65210185dc86e3dec824e9
Author: shanthoosh <sp...@usc.edu>
AuthorDate: Wed May 15 17:22:40 2019 -0700

    SAMZA-2192: Add StartpointVisitor implementation for EventHub. (#1030)
    
    * Adding initial implementation for EventHubSystemAdmin.
    
    * Code clean up.
---
 .../system/eventhub/admin/EventHubSystemAdmin.java | 112 +++++++++++++++++++--
 .../eventhub/admin/TestEventHubSystemAdmin.java    |  75 ++++++++++++++
 2 files changed, 181 insertions(+), 6 deletions(-)

diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 44353d2..2d22929 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -19,14 +19,28 @@
 
 package org.apache.samza.system.eventhub.admin;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
 import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
+import com.microsoft.azure.eventhubs.EventPosition;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
 import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
 
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
+import org.apache.samza.startpoint.Startpoint;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointSpecific;
+import org.apache.samza.startpoint.StartpointTimestamp;
+import org.apache.samza.startpoint.StartpointUpcoming;
+import org.apache.samza.startpoint.StartpointVisitor;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
@@ -57,12 +71,27 @@ public class EventHubSystemAdmin implements SystemAdmin {
   private final EventHubConfig eventHubConfig;
   private final Map<String, EventHubClientManager> eventHubClients = new HashMap<>();
   private final Map<String, String[]> streamPartitions = new HashMap<>();
+  private final EventHubSamzaOffsetResolver eventHubSamzaOffsetResolver;
 
   public EventHubSystemAdmin(String systemName, EventHubConfig eventHubConfig,
                              EventHubClientManagerFactory eventHubClientManagerFactory) {
     this.systemName = systemName;
     this.eventHubConfig = eventHubConfig;
     this.eventHubClientManagerFactory = eventHubClientManagerFactory;
+    this.eventHubSamzaOffsetResolver = new EventHubSamzaOffsetResolver(this, eventHubConfig);
+  }
+
+  @Override
+  public void stop() {
+    for (Map.Entry<String, EventHubClientManager> entry : eventHubClients.entrySet()) {
+      EventHubClientManager eventHubClientManager = entry.getValue();
+      try {
+        eventHubClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
+      } catch (Exception e) {
+        LOG.warn(String.format("Exception occurred when closing EventHubClient of stream: %s.", entry.getKey()), e);
+      }
+    }
+    eventHubClients.clear();
   }
 
   @Override
@@ -121,20 +150,20 @@ public class EventHubSystemAdmin implements SystemAdmin {
         SystemStreamMetadata systemStreamMetadata = new SystemStreamMetadata(streamName, sspMetadataMap);
         requestedMetadata.put(streamName, systemStreamMetadata);
       }
+      return requestedMetadata;
     } catch (Exception e) {
       String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s", systemName);
       LOG.error(msg, e);
       throw new SamzaException(msg, e);
-    } finally {
-      // Closing clients
-      eventHubClients.forEach((streamName, client) -> client.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
-      eventHubClients.clear();
     }
+  }
 
-    return requestedMetadata;
+  @Override
+  public String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
+    return startpoint.apply(systemStreamPartition, eventHubSamzaOffsetResolver);
   }
 
-  private EventHubClientManager getOrCreateStreamEventHubClient(String streamName) {
+  EventHubClientManager getOrCreateStreamEventHubClient(String streamName) {
     if (!eventHubClients.containsKey(streamName)) {
       LOG.info(String.format("Creating EventHubClient for Stream=%s", streamName));
 
@@ -205,4 +234,75 @@ public class EventHubSystemAdmin implements SystemAdmin {
   public Integer offsetComparator(String offset1, String offset2) {
     return compareOffsets(offset1, offset2);
   }
+
+  /**
+   * Offers a eventhub specific implementation of {@link StartpointVisitor} that resolves
+   * different types of {@link Startpoint} to samza offset.
+   */
+  @VisibleForTesting
+  static class EventHubSamzaOffsetResolver implements StartpointVisitor<SystemStreamPartition, String> {
+
+    private final EventHubSystemAdmin eventHubSystemAdmin;
+    private final EventHubConfig eventHubConfig;
+
+    EventHubSamzaOffsetResolver(EventHubSystemAdmin eventHubSystemAdmin, EventHubConfig eventHubConfig) {
+      this.eventHubSystemAdmin = eventHubSystemAdmin;
+      this.eventHubConfig = eventHubConfig;
+    }
+
+    @Override
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
+      return startpointSpecific.getSpecificOffset();
+    }
+
+    @Override
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+      String streamName = systemStreamPartition.getStream();
+      EventHubClientManager eventHubClientManager = eventHubSystemAdmin.getOrCreateStreamEventHubClient(streamName);
+      EventHubClient eventHubClient = eventHubClientManager.getEventHubClient();
+
+      PartitionReceiver partitionReceiver = null;
+      try {
+        // 1. Initialize the arguments required for creating the partition receiver.
+        String partitionId = String.valueOf(systemStreamPartition.getPartition().getPartitionId());
+        Instant epochInMillisInstant = Instant.ofEpochMilli(startpointTimestamp.getTimestampOffset());
+        EventPosition eventPosition = EventPosition.fromEnqueuedTime(epochInMillisInstant);
+        String consumerGroup = eventHubConfig.getStreamConsumerGroup(systemStreamPartition.getSystem(), streamName);
+
+        // 2. Create a partition receiver with event position defined by the timestamp.
+        partitionReceiver = eventHubClient.createReceiverSync(consumerGroup, partitionId, eventPosition);
+
+        // 3. Read a single message from the partition receiver.
+        Iterable<EventData> eventHubMessagesIterator = partitionReceiver.receiveSync(1);
+        ArrayList<EventData> eventHubMessageList = Lists.newArrayList(eventHubMessagesIterator);
+
+        // 4. Validate that a single message was fetched from the broker.
+        Preconditions.checkState(eventHubMessageList.size() == 1, "Failed to read messages from EventHub system.");
+
+        // 5. Return the offset present in the metadata of the first message.
+        return eventHubMessageList.get(0).getSystemProperties().getOffset();
+      } catch (EventHubException e) {
+        LOG.error(String.format("Exception occurred when fetching offset for timestamp: %d from the stream: %s", startpointTimestamp.getTimestampOffset(), streamName), e);
+        throw new SamzaException(e);
+      } finally {
+        if (partitionReceiver != null) {
+          try {
+            partitionReceiver.closeSync();
+          } catch (EventHubException e) {
+            LOG.error(String.format("Exception occurred when closing partition-receiver of the stream: %s", streamName), e);
+          }
+        }
+      }
+    }
+
+    @Override
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
+      return EventHubSystemConsumer.START_OF_STREAM;
+    }
+
+    @Override
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
+      return EventHubSystemConsumer.END_OF_STREAM;
+    }
+  }
 }
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
index e45d3f4..befbf3a 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
@@ -19,11 +19,24 @@
 
 package org.apache.samza.system.eventhub.admin;
 
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
+import java.util.Arrays;
 import org.apache.samza.Partition;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointSpecific;
+import org.apache.samza.startpoint.StartpointTimestamp;
+import org.apache.samza.startpoint.StartpointUpcoming;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.eventhub.EventHubClientManager;
+import org.apache.samza.system.eventhub.EventHubConfig;
 import org.apache.samza.system.eventhub.EventHubSystemFactory;
 import org.apache.samza.system.eventhub.MockEventHubConfigFactory;
+import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin.EventHubSamzaOffsetResolver;
 import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
 import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
 import org.junit.Assert;
@@ -33,6 +46,7 @@ import org.junit.Test;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.mockito.Mockito;
 
 import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*;
 
@@ -80,4 +94,65 @@ public class TestEventHubSystemAdmin {
     }
   }
 
+  @Test
+  public void testStartpointResolverShouldResolveTheStartpointOldestToCorrectOffset() {
+    EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class);
+    EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class);
+    SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
+
+    EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig);
+
+    Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM, resolver.visit(systemStreamPartition, new StartpointOldest()));
+  }
+
+  @Test
+  public void testStartpointResolverShouldResolveTheStartpointUpcomingToCorrectOffset() {
+    EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class);
+    EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class);
+    SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
+
+    EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig);
+
+    Assert.assertEquals(EventHubSystemConsumer.END_OF_STREAM, resolver.visit(systemStreamPartition, new StartpointUpcoming()));
+  }
+
+  @Test
+  public void testStartpointResolverShouldResolveTheStartpointSpecificToCorrectOffset() {
+    EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class);
+    EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class);
+    SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
+
+    EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig);
+
+    Assert.assertEquals("100", resolver.visit(systemStreamPartition, new StartpointSpecific("100")));
+  }
+
+  @Test
+  public void testStartpointResolverShouldResolveTheStartpointTimestampToCorrectOffset() throws EventHubException {
+    // Initialize variables required for testing.
+    EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class);
+    EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class);
+    SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
+    String mockedOffsetToReturn = "100";
+
+    // Setup the mock variables.
+    EventHubClientManager mockEventHubClientManager = Mockito.mock(EventHubClientManager.class);
+    EventHubClient mockEventHubClient = Mockito.mock(EventHubClient.class);
+    PartitionReceiver mockPartitionReceiver = Mockito.mock(PartitionReceiver.class);
+    EventData mockEventData = Mockito.mock(EventData.class);
+    EventData.SystemProperties mockSystemProperties = Mockito.mock(EventData.SystemProperties.class);
+
+    // Configure the mock variables to return the appropriate values.
+    Mockito.when(mockEventHubSystemAdmin.getOrCreateStreamEventHubClient("test-stream")).thenReturn(mockEventHubClientManager);
+    Mockito.when(mockEventHubClientManager.getEventHubClient()).thenReturn(mockEventHubClient);
+    Mockito.when(mockEventHubClient.createReceiverSync(Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(mockPartitionReceiver);
+    Mockito.when(mockPartitionReceiver.receiveSync(1)).thenReturn(Arrays.asList(mockEventData));
+    Mockito.when(mockEventData.getSystemProperties()).thenReturn(mockSystemProperties);
+    Mockito.when(mockSystemProperties.getOffset()).thenReturn(mockedOffsetToReturn);
+
+    // Test the Offset resolver.
+    EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig);
+    String resolvedOffset = resolver.visit(systemStreamPartition, new StartpointTimestamp(100L));
+    Assert.assertEquals(mockedOffsetToReturn, resolvedOffset);
+  }
 }