You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/05/19 01:28:27 UTC
[samza] branch master updated: SAMZA-2506: Inconsistent end of
stream semantics in SystemStreamPartitionMetadata (#1345)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0a7760a SAMZA-2506: Inconsistent end of stream semantics in SystemStreamPartitionMetadata (#1345)
0a7760a is described below
commit 0a7760a55be2b6089088016c55217814b5c9675f
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Mon May 18 18:28:19 2020 -0700
SAMZA-2506: Inconsistent end of stream semantics in SystemStreamPartitionMetadata (#1345)
---
.../samza/system/inmemory/InMemoryManager.java | 37 ++++++++++++++++++----
.../samza/system/inmemory/TestInMemoryManager.java | 9 ++++++
2 files changed, 40 insertions(+), 6 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index f3028f9..9055819 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -181,12 +181,37 @@ class InMemoryManager {
.stream()
.collect(Collectors.toMap(entry -> entry.getKey().getPartition(), entry -> {
List<IncomingMessageEnvelope> messages = entry.getValue();
- String oldestOffset = messages.isEmpty() ? null : "0";
- String newestOffset = messages.isEmpty() ? null : String.valueOf(messages.size() - 1);
- String upcomingOffset = String.valueOf(messages.size());
-
- return new SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset);
-
+ Integer oldestOffset;
+ Integer newestOffset;
+ int upcomingOffset;
+
+ if (messages.isEmpty()) {
+ oldestOffset = null;
+ newestOffset = null;
+ upcomingOffset = 0;
+ } else if (messages.get(messages.size() - 1).isEndOfStream()) {
+ if (messages.size() > 1) {
+ // don't count end of stream in offset indices
+ oldestOffset = 0;
+ newestOffset = messages.size() - 2;
+ upcomingOffset = messages.size() - 1;
+ } else {
+ // end of stream is the only message, treat the same as empty
+ oldestOffset = null;
+ newestOffset = null;
+ upcomingOffset = 0;
+ }
+ } else {
+ // offsets correspond strictly to numeric indices
+ oldestOffset = 0;
+ newestOffset = messages.size() - 1;
+ upcomingOffset = messages.size();
+ }
+
+ return new SystemStreamMetadata.SystemStreamPartitionMetadata(
+ oldestOffset == null ? null : oldestOffset.toString(),
+ newestOffset == null ? null : newestOffset.toString(),
+ Integer.toString(upcomingOffset));
}));
return new SystemStreamMetadata(streamName, partitionMetadata);
diff --git a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
index 7a32483..890b669 100644
--- a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
+++ b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.samza.Partition;
+import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStreamMetadata;
@@ -77,9 +78,17 @@ public class TestInMemoryManager {
ImmutableMap.of(new Partition(0), new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2")));
SystemStreamMetadata systemStreamMetadata1 = new SystemStreamMetadata(STREAM1,
ImmutableMap.of(new Partition(0), new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "0", "1")));
+
// also test a batch call for multiple streams here
assertEquals(ImmutableMap.of(STREAM0, systemStreamMetadata0, STREAM1, systemStreamMetadata1),
this.inMemoryManager.getSystemStreamMetadata(SYSTEM, ImmutableSet.of(STREAM0, STREAM1)));
+
+ // test END_OF_STREAM doesn't alter new or upcoming offset
+ this.inMemoryManager.put(ssp0, "key02", new EndOfStreamMessage());
+ systemStreamMetadata0 = new SystemStreamMetadata(STREAM0,
+ ImmutableMap.of(new Partition(0), new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2")));
+ assertEquals(ImmutableMap.of(STREAM0, systemStreamMetadata0),
+ this.inMemoryManager.getSystemStreamMetadata(SYSTEM, ImmutableSet.of(STREAM0)));
}
@Test