You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/09/26 19:42:12 UTC
samza git commit: SAMZA-1914: fix out of range starting offset in EH
consumer
Repository: samza
Updated Branches:
refs/heads/master cfbb9c6eb -> 03410b80c
SAMZA-1914: fix out of range starting offset in EH consumer
Author: Hai Lu <ha...@linkedin.com>
Reviewers: Srinivasulu <sp...@linkedin.com>
Closes #664 from lhaiesp/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/03410b80
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/03410b80
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/03410b80
Branch: refs/heads/master
Commit: 03410b80c674fc5001ff02db13b269df76dd0fae
Parents: cfbb9c6
Author: Hai Lu <ha...@linkedin.com>
Authored: Wed Sep 26 12:42:06 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Wed Sep 26 12:42:06 2018 -0700
----------------------------------------------------------------------
.../eventhub/admin/EventHubSystemAdmin.java | 15 ++++-----------
.../consumer/EventHubSystemConsumer.java | 6 ++----
.../eventhub/admin/TestEventHubSystemAdmin.java | 19 -------------------
3 files changed, 6 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/03410b80/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 2141ebd..27abe07 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -65,19 +65,12 @@ public class EventHubSystemAdmin implements SystemAdmin {
this.eventHubClientManagerFactory = eventHubClientManagerFactory;
}
- private String getNextOffset(String currentOffset) {
- // EventHub will return the first message AFTER the offset
- // that was specified in the fetch request.
- // If no such offset exists Eventhub will return an error.
- return String.valueOf(Long.parseLong(currentOffset) + 1);
- }
-
@Override
public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
- Map<SystemStreamPartition, String> results = new HashMap<>();
-
- offsets.forEach((partition, offset) -> results.put(partition, getNextOffset(offset)));
- return results;
+ // In EventHubSystemConsumer#initializeEventHubsManagers, we exclude the offset that we specify. i.e.
+ // we will only get the message after the checkpoint offset. Hence, by returning the same offset as the
+ // "next" offset, we won't be reprocessing the same event.
+ return offsets;
}
// EventHubRuntimeInformation does not implement toString()
http://git-wip-us.apache.org/repos/asf/samza/blob/03410b80/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 454fc57..a05b5e2 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -276,13 +276,11 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
receiver = eventHubClientManager.getEventHubClient()
.createReceiverSync(consumerGroup, partitionId.toString(), EventPosition.fromEnqueuedTime(Instant.now()));
} else {
- // If the offset is less or equal to the newest offset in the system, it can be
- // used as the starting offset to receive from. EventHub will return the first
- // message AFTER the offset that was specified in the fetch request.
+ // EventHub will return the first message AFTER the offset that was specified in the fetch request.
// If no such offset exists Eventhub will return an error.
receiver = eventHubClientManager.getEventHubClient()
.createReceiverSync(consumerGroup, partitionId.toString(),
- EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM)));
+ EventPosition.fromOffset(offset, /* inclusiveFlag */false));
}
receiver.setPrefetchCount(prefetchCount);
http://git-wip-us.apache.org/repos/asf/samza/blob/03410b80/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
index 8861152..e45d3f4 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
@@ -22,7 +22,6 @@ package org.apache.samza.system.eventhub.admin;
import org.apache.samza.Partition;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.eventhub.EventHubSystemFactory;
import org.apache.samza.system.eventhub.MockEventHubConfigFactory;
import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
@@ -31,7 +30,6 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -53,23 +51,6 @@ public class TestEventHubSystemAdmin {
Assert.assertNull(eventHubSystemAdmin.offsetComparator(EventHubSystemConsumer.END_OF_STREAM, EventHubSystemConsumer.END_OF_STREAM));
}
- @Test
- public void testGetNextOffset() {
- EventHubSystemFactory eventHubSystemFactory = new EventHubSystemFactory();
- SystemAdmin eventHubSystemAdmin = eventHubSystemFactory.getAdmin(SYSTEM_NAME,
- MockEventHubConfigFactory.getEventHubConfig(EventHubSystemProducer.PartitioningMethod.EVENT_HUB_HASHING));
- Map<SystemStreamPartition, String> offsets = new HashMap<>();
- SystemStreamPartition ssp0 = new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME1, new Partition(0));
- SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME1, new Partition(2));
- offsets.put(ssp0, Integer.toString(0));
- offsets.put(ssp2, EventHubSystemConsumer.START_OF_STREAM);
-
- Map<SystemStreamPartition, String> updatedOffsets = eventHubSystemAdmin.getOffsetsAfter(offsets);
- Assert.assertEquals(offsets.size(), updatedOffsets.size());
- Assert.assertEquals("1", updatedOffsets.get(ssp0));
- Assert.assertEquals("0", updatedOffsets.get(ssp2));
- }
-
@Ignore("Integration Test")
@Test
public void testGetStreamMetadata() {