You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2020/05/01 18:12:23 UTC
[samza] branch master updated: SAMZA-2517 : Adding handling and
relevant exception message for null in oldest offset from system-admin
(#1353)
This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9977adf SAMZA-2517 : Adding handling and relevant exception message for null in oldest offset from system-admin (#1353)
9977adf is described below
commit 9977adf32da1bb17e11bed452196a0be666cb33b
Author: rmatharu <40...@users.noreply.github.com>
AuthorDate: Fri May 1 11:12:14 2020 -0700
SAMZA-2517 : Adding handling and relevant exception message for null in oldest offset from system-admin (#1353)
* Adding handling and relevant exception message for null in oldest offset from system-admin
Co-authored-by: Ray Manpreet Singh Matharu <rm...@rmatharu-mn1.linkedin.biz>
---
.../apache/samza/storage/StorageManagerUtil.java | 3 +-
.../samza/storage/TaskSideInputStorageManager.java | 18 +++++-----
.../samza/storage/ContainerStorageManager.java | 3 +-
.../storage/TestTaskSideInputStorageManager.java | 39 ++++++++++++++++++++++
4 files changed, 53 insertions(+), 10 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 50d7950..c2ebe44 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -94,7 +94,8 @@ public class StorageManagerUtil {
+ " The values between these offsets cannot be restored.", resumeOffset, oldestOffset);
}
}
-
+ LOG.info("Starting offset for SystemStreamPartition {} is {}, fileOffset: {}, oldestOffset from source: {}", ssp,
+ startingOffset, fileOffset, oldestOffset);
return startingOffset;
}
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index e2cfe1d..9cd888a 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -118,10 +118,10 @@ public class TaskSideInputStorageManager {
LOG.info("Initializing side input stores.");
Map<SystemStreamPartition, String> fileOffsets = getFileOffsets();
- LOG.info("File offsets for the task {}: ", taskName, fileOffsets);
+ LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
Map<SystemStreamPartition, String> oldestOffsets = getOldestOffsets();
- LOG.info("Oldest offsets for the task {}: ", taskName, fileOffsets);
+ LOG.info("Oldest offsets for the task {}: {}", taskName, oldestOffsets);
startingOffsets = getStartingOffsets(fileOffsets, oldestOffsets);
LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
@@ -346,7 +346,8 @@ public class TaskSideInputStorageManager {
* 3. Fetches the partition metadata for each system stream and fetch the corresponding partition metadata
* and populates the oldest offset for SSPs belonging to the system stream.
*
- * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset.
+ * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset. If partitionMetadata could not be
+ * obtained for any {@link SystemStreamPartition} the offset for it is populated as null.
*/
@VisibleForTesting
Map<SystemStreamPartition, String> getOldestOffsets() {
@@ -363,16 +364,17 @@ public class TaskSideInputStorageManager {
// Step 3
metadata.forEach((systemStream, systemStreamMetadata) -> {
+
// get the partition metadata for each system stream
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata =
systemStreamMetadata.getSystemStreamPartitionMetadata();
// For SSPs belonging to the system stream, use the partition metadata to get the oldest offset
- Map<SystemStreamPartition, String> offsets = systemStreamToSsp.get(systemStream).stream()
- .collect(
- Collectors.toMap(Function.identity(), ssp -> partitionMetadata.get(ssp.getPartition()).getOldestOffset()));
-
- oldestOffsets.putAll(offsets);
+ // if partitionMetadata was not obtained for any SSP, populate oldest-offset as null
+ // Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using lambda will NPE when getOldestOffset() is null
+ for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
+ oldestOffsets.put(ssp, partitionMetadata.get(ssp.getPartition()).getOldestOffset());
+ }
});
return oldestOffsets;
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 8623e5d..6e59e55 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -738,7 +738,8 @@ public class ContainerStorageManager {
String startingOffset = sideInputStorageManagers.get(ssp).getStartingOffset(ssp);
if (startingOffset == null) {
- throw new SamzaException("No offset defined for SideInput SystemStreamPartition : " + ssp);
+ throw new SamzaException(
+ "No starting offset could be obtained for SideInput SystemStreamPartition : " + ssp + ". Consumer cannot start.");
}
// register startingOffset with the sysConsumer and register a metric for it
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
index a7cefa0..6761702 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
@@ -21,6 +21,7 @@ package org.apache.samza.storage;
import com.google.common.collect.ImmutableSet;
import java.io.File;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -34,9 +35,12 @@ import org.apache.samza.job.model.TaskMode;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -182,6 +186,36 @@ public class TestTaskSideInputStorageManager {
});
}
+ /**
+ * This test is for cases, when calls to systemAdmin (e.g., KafkaSystemAdmin's) get-stream-metadata method return null.
+ */
+ @Test
+ public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
+ final String storeName = "test-get-starting-offset-store";
+ final String taskName = "test-get-starting-offset-task";
+
+ Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
+ .mapToObj(idx -> new SystemStreamPartition("test-system", "test-stream", new Partition(idx)))
+ .collect(Collectors.toSet());
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = ssps.stream()
+ .collect(Collectors.toMap(SystemStreamPartition::getPartition,
+ x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null, "1", "2")));
+
+
+ TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
+ .addLoggedStore(storeName, ssps)
+ .addStreamMetadata(Collections.singletonMap(new SystemStream("test-system", "test-stream"),
+ new SystemStreamMetadata("test-stream", partitionMetadata)))
+ .build();
+
+ initializeSideInputStorageManager(testSideInputStorageManager);
+ ssps.forEach(ssp -> {
+ String startingOffset = testSideInputStorageManager.getStartingOffset(
+ new SystemStreamPartition("test-system", "test-stream", ssp.getPartition()));
+ Assert.assertNull("Starting offset should be null", startingOffset);
+ });
+ }
+
@Test
public void testGetStartingOffsets() {
final String storeName = "test-get-starting-offset-store";
@@ -276,6 +310,11 @@ public class TestTaskSideInputStorageManager {
return this;
}
+ MockTaskSideInputStorageManagerBuilder addStreamMetadata(Map<SystemStream, SystemStreamMetadata> streamMetadata) {
+ doReturn(ScalaJavaUtil.toScalaMap(streamMetadata)).when(streamMetadataCache).getStreamMetadata(any(), anyBoolean());
+ return this;
+ }
+
MockTaskSideInputStorageManagerBuilder addLoggedStore(String storeName, Set<SystemStreamPartition> ssps) {
StorageEngine storageEngine = mock(StorageEngine.class);
when(storageEngine.getStoreProperties()).thenReturn(