You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2020/05/01 18:12:23 UTC

[samza] branch master updated: SAMZA-2517 : Adding handling and relevant exception message for null in oldest offset from system-admin (#1353)

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 9977adf  SAMZA-2517 : Adding handling and relevant exception message for null in oldest offset from system-admin (#1353)
9977adf is described below

commit 9977adf32da1bb17e11bed452196a0be666cb33b
Author: rmatharu <40...@users.noreply.github.com>
AuthorDate: Fri May 1 11:12:14 2020 -0700

    SAMZA-2517 : Adding handling and relevant exception message for null in oldest offset from system-admin (#1353)
    
    * Adding handling and relevant exception message for null in oldest offset from system-admin
    
    Co-authored-by: Ray Manpreet  Singh Matharu <rm...@rmatharu-mn1.linkedin.biz>
---
 .../apache/samza/storage/StorageManagerUtil.java   |  3 +-
 .../samza/storage/TaskSideInputStorageManager.java | 18 +++++-----
 .../samza/storage/ContainerStorageManager.java     |  3 +-
 .../storage/TestTaskSideInputStorageManager.java   | 39 ++++++++++++++++++++++
 4 files changed, 53 insertions(+), 10 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 50d7950..c2ebe44 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -94,7 +94,8 @@ public class StorageManagerUtil {
             + " The values between these offsets cannot be restored.", resumeOffset, oldestOffset);
       }
     }
-
+    LOG.info("Starting offset for SystemStreamPartition {} is {}, fileOffset: {}, oldestOffset from source: {}", ssp,
+        startingOffset, fileOffset, oldestOffset);
     return startingOffset;
   }
 
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index e2cfe1d..9cd888a 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -118,10 +118,10 @@ public class TaskSideInputStorageManager {
     LOG.info("Initializing side input stores.");
 
     Map<SystemStreamPartition, String> fileOffsets = getFileOffsets();
-    LOG.info("File offsets for the task {}: ", taskName, fileOffsets);
+    LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
 
     Map<SystemStreamPartition, String> oldestOffsets = getOldestOffsets();
-    LOG.info("Oldest offsets for the task {}: ", taskName, fileOffsets);
+    LOG.info("Oldest offsets for the task {}: {}", taskName, oldestOffsets);
 
     startingOffsets = getStartingOffsets(fileOffsets, oldestOffsets);
     LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
@@ -346,7 +346,8 @@ public class TaskSideInputStorageManager {
    *   3. Fetches the partition metadata for each system stream and fetch the corresponding partition metadata
    *      and populates the oldest offset for SSPs belonging to the system stream.
    *
-   * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset.
+   * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset. If partitionMetadata could not be
+   * obtained for any {@link SystemStreamPartition} the offset for it is populated as null.
    */
   @VisibleForTesting
   Map<SystemStreamPartition, String> getOldestOffsets() {
@@ -363,16 +364,17 @@ public class TaskSideInputStorageManager {
 
     // Step 3
     metadata.forEach((systemStream, systemStreamMetadata) -> {
+
         // get the partition metadata for each system stream
         Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata =
           systemStreamMetadata.getSystemStreamPartitionMetadata();
 
         // For SSPs belonging to the system stream, use the partition metadata to get the oldest offset
-        Map<SystemStreamPartition, String> offsets = systemStreamToSsp.get(systemStream).stream()
-          .collect(
-              Collectors.toMap(Function.identity(), ssp -> partitionMetadata.get(ssp.getPartition()).getOldestOffset()));
-
-        oldestOffsets.putAll(offsets);
+        // if partitionMetadata was not obtained for any SSP, populate oldest-offset as null
+        // Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using lambda will NPE when getOldestOffset() is null
+        for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
+          oldestOffsets.put(ssp, partitionMetadata.get(ssp.getPartition()).getOldestOffset());
+        }
       });
 
     return oldestOffsets;
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 8623e5d..6e59e55 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -738,7 +738,8 @@ public class ContainerStorageManager {
       String startingOffset = sideInputStorageManagers.get(ssp).getStartingOffset(ssp);
 
       if (startingOffset == null) {
-        throw new SamzaException("No offset defined for SideInput SystemStreamPartition : " + ssp);
+        throw new SamzaException(
+            "No starting offset could be obtained for SideInput SystemStreamPartition : " + ssp + ". Consumer cannot start.");
       }
 
       // register startingOffset with the sysConsumer and register a metric for it
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
index a7cefa0..6761702 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
@@ -21,6 +21,7 @@ package org.apache.samza.storage;
 
 import com.google.common.collect.ImmutableSet;
 import java.io.File;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -34,9 +35,12 @@ import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -182,6 +186,36 @@ public class TestTaskSideInputStorageManager {
       });
   }
 
+  /**
+   * This test is for cases, when calls to systemAdmin (e.g., KafkaSystemAdmin's) get-stream-metadata method return null.
+   */
+  @Test
+  public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
+    final String storeName = "test-get-starting-offset-store";
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
+        .mapToObj(idx -> new SystemStreamPartition("test-system", "test-stream", new Partition(idx)))
+        .collect(Collectors.toSet());
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = ssps.stream()
+        .collect(Collectors.toMap(SystemStreamPartition::getPartition,
+            x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null, "1", "2")));
+
+
+    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
+        .addLoggedStore(storeName, ssps)
+        .addStreamMetadata(Collections.singletonMap(new SystemStream("test-system", "test-stream"),
+            new SystemStreamMetadata("test-stream", partitionMetadata)))
+        .build();
+
+    initializeSideInputStorageManager(testSideInputStorageManager);
+    ssps.forEach(ssp -> {
+        String startingOffset = testSideInputStorageManager.getStartingOffset(
+            new SystemStreamPartition("test-system", "test-stream", ssp.getPartition()));
+        Assert.assertNull("Starting offset should be null", startingOffset);
+      });
+  }
+
   @Test
   public void testGetStartingOffsets() {
     final String storeName = "test-get-starting-offset-store";
@@ -276,6 +310,11 @@ public class TestTaskSideInputStorageManager {
       return this;
     }
 
+    MockTaskSideInputStorageManagerBuilder addStreamMetadata(Map<SystemStream, SystemStreamMetadata> streamMetadata) {
+      doReturn(ScalaJavaUtil.toScalaMap(streamMetadata)).when(streamMetadataCache).getStreamMetadata(any(), anyBoolean());
+      return this;
+    }
+
     MockTaskSideInputStorageManagerBuilder addLoggedStore(String storeName, Set<SystemStreamPartition> ssps) {
       StorageEngine storageEngine = mock(StorageEngine.class);
       when(storageEngine.getStoreProperties()).thenReturn(