You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/05/19 01:28:27 UTC

[samza] branch master updated: SAMZA-2506: Inconsistent end of stream semantics in SystemStreamPartitionMetadata (#1345)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a7760a  SAMZA-2506: Inconsistent end of stream semantics in SystemStreamPartitionMetadata (#1345)
0a7760a is described below

commit 0a7760a55be2b6089088016c55217814b5c9675f
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Mon May 18 18:28:19 2020 -0700

    SAMZA-2506: Inconsistent end of stream semantics in SystemStreamPartitionMetadata (#1345)
---
 .../samza/system/inmemory/InMemoryManager.java     | 37 ++++++++++++++++++----
 .../samza/system/inmemory/TestInMemoryManager.java |  9 ++++++
 2 files changed, 40 insertions(+), 6 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index f3028f9..9055819 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -181,12 +181,37 @@ class InMemoryManager {
             .stream()
             .collect(Collectors.toMap(entry -> entry.getKey().getPartition(), entry -> {
                 List<IncomingMessageEnvelope> messages = entry.getValue();
-                String oldestOffset = messages.isEmpty() ? null : "0";
-                String newestOffset = messages.isEmpty() ? null : String.valueOf(messages.size() - 1);
-                String upcomingOffset = String.valueOf(messages.size());
-
-                return new SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset);
-
+                Integer oldestOffset;
+                Integer newestOffset;
+                int upcomingOffset;
+
+                if (messages.isEmpty()) {
+                  oldestOffset = null;
+                  newestOffset = null;
+                  upcomingOffset = 0;
+                } else if (messages.get(messages.size() - 1).isEndOfStream()) {
+                  if (messages.size() > 1) {
+                    // don't count end of stream in offset indices
+                    oldestOffset = 0;
+                    newestOffset = messages.size() - 2;
+                    upcomingOffset = messages.size() - 1;
+                  } else {
+                    // end of stream is the only message, treat the same as empty
+                    oldestOffset = null;
+                    newestOffset = null;
+                    upcomingOffset = 0;
+                  }
+                } else {
+                  // offsets correspond strictly to numeric indices
+                  oldestOffset = 0;
+                  newestOffset = messages.size() - 1;
+                  upcomingOffset = messages.size();
+                }
+
+                return new SystemStreamMetadata.SystemStreamPartitionMetadata(
+                    oldestOffset == null ? null : oldestOffset.toString(),
+                    newestOffset == null ? null : newestOffset.toString(),
+                    Integer.toString(upcomingOffset));
               }));
 
     return new SystemStreamMetadata(streamName, partitionMetadata);
diff --git a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
index 7a32483..890b669 100644
--- a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
+++ b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
+import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamMetadata;
@@ -77,9 +78,17 @@ public class TestInMemoryManager {
         ImmutableMap.of(new Partition(0), new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2")));
     SystemStreamMetadata systemStreamMetadata1 = new SystemStreamMetadata(STREAM1,
         ImmutableMap.of(new Partition(0), new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "0", "1")));
+
     // also test a batch call for multiple streams here
     assertEquals(ImmutableMap.of(STREAM0, systemStreamMetadata0, STREAM1, systemStreamMetadata1),
         this.inMemoryManager.getSystemStreamMetadata(SYSTEM, ImmutableSet.of(STREAM0, STREAM1)));
+
+    // test END_OF_STREAM doesn't alter new or upcoming offset
+    this.inMemoryManager.put(ssp0, "key02", new EndOfStreamMessage());
+    systemStreamMetadata0 = new SystemStreamMetadata(STREAM0,
+        ImmutableMap.of(new Partition(0), new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2")));
+    assertEquals(ImmutableMap.of(STREAM0, systemStreamMetadata0),
+        this.inMemoryManager.getSystemStreamMetadata(SYSTEM, ImmutableSet.of(STREAM0)));
   }
 
   @Test