You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/09/26 19:42:12 UTC

samza git commit: SAMZA-1914: fix out of range starting offset in EH consumer

Repository: samza
Updated Branches:
  refs/heads/master cfbb9c6eb -> 03410b80c


SAMZA-1914: fix out of range starting offset in EH consumer

Author: Hai Lu <ha...@linkedin.com>

Reviewers: Srinivasulu <sp...@linkedin.com>

Closes #664 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/03410b80
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/03410b80
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/03410b80

Branch: refs/heads/master
Commit: 03410b80c674fc5001ff02db13b269df76dd0fae
Parents: cfbb9c6
Author: Hai Lu <ha...@linkedin.com>
Authored: Wed Sep 26 12:42:06 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Wed Sep 26 12:42:06 2018 -0700

----------------------------------------------------------------------
 .../eventhub/admin/EventHubSystemAdmin.java      | 15 ++++-----------
 .../consumer/EventHubSystemConsumer.java         |  6 ++----
 .../eventhub/admin/TestEventHubSystemAdmin.java  | 19 -------------------
 3 files changed, 6 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/03410b80/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 2141ebd..27abe07 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -65,19 +65,12 @@ public class EventHubSystemAdmin implements SystemAdmin {
     this.eventHubClientManagerFactory = eventHubClientManagerFactory;
   }
 
-  private String getNextOffset(String currentOffset) {
-    // EventHub will return the first message AFTER the offset
-    // that was specified in the fetch request.
-    // If no such offset exists Eventhub will return an error.
-    return String.valueOf(Long.parseLong(currentOffset) + 1);
-  }
-
   @Override
   public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
-    Map<SystemStreamPartition, String> results = new HashMap<>();
-
-    offsets.forEach((partition, offset) -> results.put(partition, getNextOffset(offset)));
-    return results;
+    // In EventHubSystemConsumer#initializeEventHubsManagers, we exclude the offset that we specify. i.e.
+    // we will only get the message after the checkpoint offset. Hence, by returning the same offset as the
+    // "next" offset, we won't be reprocessing the same event.
+    return offsets;
   }
 
   // EventHubRuntimeInformation does not implement toString()

http://git-wip-us.apache.org/repos/asf/samza/blob/03410b80/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 454fc57..a05b5e2 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -276,13 +276,11 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
           receiver = eventHubClientManager.getEventHubClient()
               .createReceiverSync(consumerGroup, partitionId.toString(), EventPosition.fromEnqueuedTime(Instant.now()));
         } else {
-          // If the offset is less or equal to the newest offset in the system, it can be
-          // used as the starting offset to receive from. EventHub will return the first
-          // message AFTER the offset that was specified in the fetch request.
+          // EventHub will return the first message AFTER the offset that was specified in the fetch request.
           // If no such offset exists Eventhub will return an error.
           receiver = eventHubClientManager.getEventHubClient()
               .createReceiverSync(consumerGroup, partitionId.toString(),
-                  EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM)));
+                  EventPosition.fromOffset(offset, /* inclusiveFlag */false));
         }
 
         receiver.setPrefetchCount(prefetchCount);

http://git-wip-us.apache.org/repos/asf/samza/blob/03410b80/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
index 8861152..e45d3f4 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
@@ -22,7 +22,6 @@ package org.apache.samza.system.eventhub.admin;
 import org.apache.samza.Partition;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.eventhub.EventHubSystemFactory;
 import org.apache.samza.system.eventhub.MockEventHubConfigFactory;
 import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
@@ -31,7 +30,6 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -53,23 +51,6 @@ public class TestEventHubSystemAdmin {
     Assert.assertNull(eventHubSystemAdmin.offsetComparator(EventHubSystemConsumer.END_OF_STREAM, EventHubSystemConsumer.END_OF_STREAM));
   }
 
-  @Test
-  public void testGetNextOffset() {
-    EventHubSystemFactory eventHubSystemFactory = new EventHubSystemFactory();
-    SystemAdmin eventHubSystemAdmin = eventHubSystemFactory.getAdmin(SYSTEM_NAME,
-            MockEventHubConfigFactory.getEventHubConfig(EventHubSystemProducer.PartitioningMethod.EVENT_HUB_HASHING));
-    Map<SystemStreamPartition, String> offsets = new HashMap<>();
-    SystemStreamPartition ssp0 = new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME1, new Partition(0));
-    SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME1, new Partition(2));
-    offsets.put(ssp0, Integer.toString(0));
-    offsets.put(ssp2, EventHubSystemConsumer.START_OF_STREAM);
-
-    Map<SystemStreamPartition, String> updatedOffsets = eventHubSystemAdmin.getOffsetsAfter(offsets);
-    Assert.assertEquals(offsets.size(), updatedOffsets.size());
-    Assert.assertEquals("1", updatedOffsets.get(ssp0));
-    Assert.assertEquals("0", updatedOffsets.get(ssp2));
-  }
-
   @Ignore("Integration Test")
   @Test
   public void testGetStreamMetadata() {