You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/16 00:22:45 UTC
[samza] branch master updated: SAMZA-2192: Add StartpointVisitor
implementation for EventHub. (#1030)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 2a589fc SAMZA-2192: Add StartpointVisitor implementation for EventHub. (#1030)
2a589fc is described below
commit 2a589fcc8b13c0a90f65210185dc86e3dec824e9
Author: shanthoosh <sp...@usc.edu>
AuthorDate: Wed May 15 17:22:40 2019 -0700
SAMZA-2192: Add StartpointVisitor implementation for EventHub. (#1030)
* Adding initial implementation for EventHubSystemAdmin.
* Code clean up.
---
.../system/eventhub/admin/EventHubSystemAdmin.java | 112 +++++++++++++++++++--
.../eventhub/admin/TestEventHubSystemAdmin.java | 75 ++++++++++++++
2 files changed, 181 insertions(+), 6 deletions(-)
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 44353d2..2d22929 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -19,14 +19,28 @@
package org.apache.samza.system.eventhub.admin;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
+import com.microsoft.azure.eventhubs.EventPosition;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
+import org.apache.samza.startpoint.Startpoint;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointSpecific;
+import org.apache.samza.startpoint.StartpointTimestamp;
+import org.apache.samza.startpoint.StartpointUpcoming;
+import org.apache.samza.startpoint.StartpointVisitor;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
@@ -57,12 +71,27 @@ public class EventHubSystemAdmin implements SystemAdmin {
private final EventHubConfig eventHubConfig;
private final Map<String, EventHubClientManager> eventHubClients = new HashMap<>();
private final Map<String, String[]> streamPartitions = new HashMap<>();
+ private final EventHubSamzaOffsetResolver eventHubSamzaOffsetResolver;
public EventHubSystemAdmin(String systemName, EventHubConfig eventHubConfig,
EventHubClientManagerFactory eventHubClientManagerFactory) {
this.systemName = systemName;
this.eventHubConfig = eventHubConfig;
this.eventHubClientManagerFactory = eventHubClientManagerFactory;
+ this.eventHubSamzaOffsetResolver = new EventHubSamzaOffsetResolver(this, eventHubConfig);
+ }
+
+ @Override
+ public void stop() {
+ for (Map.Entry<String, EventHubClientManager> entry : eventHubClients.entrySet()) {
+ EventHubClientManager eventHubClientManager = entry.getValue();
+ try {
+ eventHubClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
+ } catch (Exception e) {
+ LOG.warn(String.format("Exception occurred when closing EventHubClient of stream: %s.", entry.getKey()), e);
+ }
+ }
+ eventHubClients.clear();
}
@Override
@@ -121,20 +150,20 @@ public class EventHubSystemAdmin implements SystemAdmin {
SystemStreamMetadata systemStreamMetadata = new SystemStreamMetadata(streamName, sspMetadataMap);
requestedMetadata.put(streamName, systemStreamMetadata);
}
+ return requestedMetadata;
} catch (Exception e) {
String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s", systemName);
LOG.error(msg, e);
throw new SamzaException(msg, e);
- } finally {
- // Closing clients
- eventHubClients.forEach((streamName, client) -> client.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
- eventHubClients.clear();
}
+ }
- return requestedMetadata;
+ @Override
+ public String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
+ return startpoint.apply(systemStreamPartition, eventHubSamzaOffsetResolver);
}
- private EventHubClientManager getOrCreateStreamEventHubClient(String streamName) {
+ EventHubClientManager getOrCreateStreamEventHubClient(String streamName) {
if (!eventHubClients.containsKey(streamName)) {
LOG.info(String.format("Creating EventHubClient for Stream=%s", streamName));
@@ -205,4 +234,75 @@ public class EventHubSystemAdmin implements SystemAdmin {
public Integer offsetComparator(String offset1, String offset2) {
return compareOffsets(offset1, offset2);
}
+
+ /**
+ * Offers a eventhub specific implementation of {@link StartpointVisitor} that resolves
+ * different types of {@link Startpoint} to samza offset.
+ */
+ @VisibleForTesting
+ static class EventHubSamzaOffsetResolver implements StartpointVisitor<SystemStreamPartition, String> {
+
+ private final EventHubSystemAdmin eventHubSystemAdmin;
+ private final EventHubConfig eventHubConfig;
+
+ EventHubSamzaOffsetResolver(EventHubSystemAdmin eventHubSystemAdmin, EventHubConfig eventHubConfig) {
+ this.eventHubSystemAdmin = eventHubSystemAdmin;
+ this.eventHubConfig = eventHubConfig;
+ }
+
+ @Override
+ public String visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
+ return startpointSpecific.getSpecificOffset();
+ }
+
+ @Override
+ public String visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+ String streamName = systemStreamPartition.getStream();
+ EventHubClientManager eventHubClientManager = eventHubSystemAdmin.getOrCreateStreamEventHubClient(streamName);
+ EventHubClient eventHubClient = eventHubClientManager.getEventHubClient();
+
+ PartitionReceiver partitionReceiver = null;
+ try {
+ // 1. Initialize the arguments required for creating the partition receiver.
+ String partitionId = String.valueOf(systemStreamPartition.getPartition().getPartitionId());
+ Instant epochInMillisInstant = Instant.ofEpochMilli(startpointTimestamp.getTimestampOffset());
+ EventPosition eventPosition = EventPosition.fromEnqueuedTime(epochInMillisInstant);
+ String consumerGroup = eventHubConfig.getStreamConsumerGroup(systemStreamPartition.getSystem(), streamName);
+
+ // 2. Create a partition receiver with event position defined by the timestamp.
+ partitionReceiver = eventHubClient.createReceiverSync(consumerGroup, partitionId, eventPosition);
+
+ // 3. Read a single message from the partition receiver.
+ Iterable<EventData> eventHubMessagesIterator = partitionReceiver.receiveSync(1);
+ ArrayList<EventData> eventHubMessageList = Lists.newArrayList(eventHubMessagesIterator);
+
+ // 4. Validate that a single message was fetched from the broker.
+ Preconditions.checkState(eventHubMessageList.size() == 1, "Failed to read messages from EventHub system.");
+
+ // 5. Return the offset present in the metadata of the first message.
+ return eventHubMessageList.get(0).getSystemProperties().getOffset();
+ } catch (EventHubException e) {
+ LOG.error(String.format("Exception occurred when fetching offset for timestamp: %d from the stream: %s", startpointTimestamp.getTimestampOffset(), streamName), e);
+ throw new SamzaException(e);
+ } finally {
+ if (partitionReceiver != null) {
+ try {
+ partitionReceiver.closeSync();
+ } catch (EventHubException e) {
+ LOG.error(String.format("Exception occurred when closing partition-receiver of the stream: %s", streamName), e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public String visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
+ return EventHubSystemConsumer.START_OF_STREAM;
+ }
+
+ @Override
+ public String visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
+ return EventHubSystemConsumer.END_OF_STREAM;
+ }
+ }
}
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
index e45d3f4..befbf3a 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
@@ -19,11 +19,24 @@
package org.apache.samza.system.eventhub.admin;
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
+import java.util.Arrays;
import org.apache.samza.Partition;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointSpecific;
+import org.apache.samza.startpoint.StartpointTimestamp;
+import org.apache.samza.startpoint.StartpointUpcoming;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.eventhub.EventHubClientManager;
+import org.apache.samza.system.eventhub.EventHubConfig;
import org.apache.samza.system.eventhub.EventHubSystemFactory;
import org.apache.samza.system.eventhub.MockEventHubConfigFactory;
+import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin.EventHubSamzaOffsetResolver;
import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
import org.junit.Assert;
@@ -33,6 +46,7 @@ import org.junit.Test;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.mockito.Mockito;
import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*;
@@ -80,4 +94,65 @@ public class TestEventHubSystemAdmin {
}
}
+ @Test
+ public void testStartpointResolverShouldResolveTheStartpointOldestToCorrectOffset() {
+ EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class);
+ EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class);
+ SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
+
+ EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig);
+
+ Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM, resolver.visit(systemStreamPartition, new StartpointOldest()));
+ }
+
+ @Test
+ public void testStartpointResolverShouldResolveTheStartpointUpcomingToCorrectOffset() {
+ EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class);
+ EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class);
+ SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
+
+ EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig);
+
+ Assert.assertEquals(EventHubSystemConsumer.END_OF_STREAM, resolver.visit(systemStreamPartition, new StartpointUpcoming()));
+ }
+
+ @Test
+ public void testStartpointResolverShouldResolveTheStartpointSpecificToCorrectOffset() {
+ EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class);
+ EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class);
+ SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
+
+ EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig);
+
+ Assert.assertEquals("100", resolver.visit(systemStreamPartition, new StartpointSpecific("100")));
+ }
+
+ @Test
+ public void testStartpointResolverShouldResolveTheStartpointTimestampToCorrectOffset() throws EventHubException {
+ // Initialize variables required for testing.
+ EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class);
+ EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class);
+ SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
+ String mockedOffsetToReturn = "100";
+
+ // Setup the mock variables.
+ EventHubClientManager mockEventHubClientManager = Mockito.mock(EventHubClientManager.class);
+ EventHubClient mockEventHubClient = Mockito.mock(EventHubClient.class);
+ PartitionReceiver mockPartitionReceiver = Mockito.mock(PartitionReceiver.class);
+ EventData mockEventData = Mockito.mock(EventData.class);
+ EventData.SystemProperties mockSystemProperties = Mockito.mock(EventData.SystemProperties.class);
+
+ // Configure the mock variables to return the appropriate values.
+ Mockito.when(mockEventHubSystemAdmin.getOrCreateStreamEventHubClient("test-stream")).thenReturn(mockEventHubClientManager);
+ Mockito.when(mockEventHubClientManager.getEventHubClient()).thenReturn(mockEventHubClient);
+ Mockito.when(mockEventHubClient.createReceiverSync(Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(mockPartitionReceiver);
+ Mockito.when(mockPartitionReceiver.receiveSync(1)).thenReturn(Arrays.asList(mockEventData));
+ Mockito.when(mockEventData.getSystemProperties()).thenReturn(mockSystemProperties);
+ Mockito.when(mockSystemProperties.getOffset()).thenReturn(mockedOffsetToReturn);
+
+ // Test the Offset resolver.
+ EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig);
+ String resolvedOffset = resolver.visit(systemStreamPartition, new StartpointTimestamp(100L));
+ Assert.assertEquals(mockedOffsetToReturn, resolvedOffset);
+ }
}