You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2023/03/06 21:55:27 UTC

[samza] branch master updated: Side Inputs + Blob Store Backups - Fix duplicate uploads and broken metrics.

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 9041b3a4d Side Inputs + Blob Store Backups - Fix duplicate uploads and broken metrics.
     new 3b5439283 Merge pull request #1654 from prateekm/master
9041b3a4d is described below

commit 9041b3a4d4d6dda1e16e509448aafcc503ddd6b2
Author: Prateek Maheshwari <pm...@linkedin.com>
AuthorDate: Mon Mar 6 11:48:42 2023 -0800

    Side Inputs + Blob Store Backups - Fix duplicate uploads and broken metrics.
---
 .../org/apache/samza/config/StorageConfig.java     |  6 ++-
 .../samza/storage/TaskStorageCommitManager.java    |  2 +-
 .../storage/blobstore/BlobStoreBackupManager.java  |  9 ++--
 .../metrics/BlobStoreBackupManagerMetrics.java     | 49 ++++++++++++----------
 .../org/apache/samza/config/TestStorageConfig.java | 48 ++++++++++++++++-----
 5 files changed, 75 insertions(+), 39 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index b170bacde..0bb993f19 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -22,6 +22,7 @@ package org.apache.samza.config;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -99,7 +100,8 @@ public class StorageConfig extends MapConfig {
 
   public List<String> getStoreNames() {
     Config subConfig = subset(STORE_PREFIX, true);
-    List<String> storeNames = new ArrayList<>();
+    // side input store configs can contain both the store factory and side input processor factory configs. dedup.
+    Set<String> storeNames = new HashSet<>();
     for (String key : subConfig.keySet()) {
       if (key.endsWith(SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX)) {
         storeNames.add(key.substring(0, key.length() - SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX.length()));
@@ -107,7 +109,7 @@ public class StorageConfig extends MapConfig {
         storeNames.add(key.substring(0, key.length() - FACTORY_SUFFIX.length()));
       }
     }
-    return storeNames;
+    return new ArrayList<>(storeNames);
   }
 
   public Map<String, SystemStream> getStoreChangelogs() {
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
index 09a880710..0c01fa1ac 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
@@ -133,7 +133,7 @@ public class TaskStorageCommitManager {
     // state backend factory -> store Name -> state checkpoint marker
     Map<String, Map<String, String>> stateBackendToStoreSCMs = new HashMap<>();
 
-    // for each configured state backend factory, backup the state for all stores in this task.
+    // for each configured state backend factory, snapshot the state for all stores in this task.
     stateBackendToBackupManager.forEach((stateBackendFactoryName, backupManager) -> {
       Map<String, String> snapshotSCMs = backupManager.snapshot(checkpointId);
       LOG.debug("Created snapshot for taskName: {}, checkpoint id: {}, state backend: {}. Snapshot SCMs: {}",
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
index 2b41d09de..cb7ffad8a 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
@@ -21,6 +21,8 @@ package org.apache.samza.storage.blobstore;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,6 +31,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutorService;
@@ -73,7 +76,7 @@ public class BlobStoreBackupManager implements TaskBackupManager {
   private final BlobStoreConfig blobStoreConfig;
   private final Clock clock;
   private final StorageManagerUtil storageManagerUtil;
-  private final List<String> storesToBackup;
+  private final Set<String> storesToBackup;
   private final File loggedStoreBaseDir;
   private final BlobStoreManager blobStoreManager;
   private final BlobStoreUtil blobStoreUtil;
@@ -117,8 +120,8 @@ public class BlobStoreBackupManager implements TaskBackupManager {
     this.clock = clock;
     this.storageManagerUtil = storageManagerUtil;
     StorageConfig storageConfig = new StorageConfig(config);
-    this.storesToBackup =
-        storageConfig.getPersistentStoresWithBackupFactory(BlobStoreStateBackendFactory.class.getName());
+    this.storesToBackup = ImmutableSet.copyOf(
+        storageConfig.getPersistentStoresWithBackupFactory(BlobStoreStateBackendFactory.class.getName()));
     this.loggedStoreBaseDir = loggedStoreBaseDir;
     this.blobStoreManager = blobStoreManager;
     this.blobStoreUtil = createBlobStoreUtil(blobStoreManager, executor, blobStoreConfig, blobStoreTaskBackupMetrics);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/metrics/BlobStoreBackupManagerMetrics.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/metrics/BlobStoreBackupManagerMetrics.java
index 5e41073f2..22b755ae1 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/metrics/BlobStoreBackupManagerMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/metrics/BlobStoreBackupManagerMetrics.java
@@ -107,30 +107,33 @@ public class BlobStoreBackupManagerMetrics {
   }
 
   public void initStoreMetrics(Collection<String> storeNames) {
+    // MetricRegistryMap#newGauge overwrites existing gauge, while newTimer retains and returns existing timer.
+    // For now, use computeIfAbsent instead of putIfAbsent to avoid overwriting old gauges and returning
+    // a new untracked one. Also to keep usage consistent b/w gauges and timers.
     for (String storeName: storeNames) {
-      storeDirDiffNs.putIfAbsent(storeName,
-          metricsRegistry.newTimer(GROUP, String.format("%s-dir-diff-ns", storeName)));
-      storeUploadNs.putIfAbsent(storeName,
-          metricsRegistry.newTimer(GROUP, String.format("%s-upload-ns", storeName)));
-
-      storeFilesToUpload.putIfAbsent(storeName,
-          metricsRegistry.newGauge(GROUP, String.format("%s-files-to-upload", storeName), 0L));
-      storeFilesToRetain.putIfAbsent(storeName,
-          metricsRegistry.newGauge(GROUP, String.format("%s-files-to-retain", storeName), 0L));
-      storeFilesToRemove.putIfAbsent(storeName,
-          metricsRegistry.newGauge(GROUP, String.format("%s-files-to-remove", storeName), 0L));
-      storeSubDirsToUpload.putIfAbsent(storeName,
-          metricsRegistry.newGauge(GROUP, String.format("%s-sub-dirs-to-upload", storeName), 0L));
-      storeSubDirsToRetain.putIfAbsent(storeName,
-          metricsRegistry.newGauge(GROUP, String.format("%s-sub-dirs-to-retain", storeName), 0L));
-      storeSubDirsToRemove.putIfAbsent(storeName,
-          metricsRegistry.newGauge(GROUP, String.format("%s-sub-dirs-to-remove", storeName), 0L));
-      storeBytesToUpload.putIfAbsent(storeName,
-          metricsRegistry.newGauge(GROUP, String.format("%s-bytes-to-upload", storeName), 0L));
-      storeBytesToRetain.putIfAbsent(storeName,
-          metricsRegistry.newGauge(GROUP, String.format("%s-bytes-to-retain", storeName), 0L));
-      storeBytesToRemove.putIfAbsent(storeName,
-          metricsRegistry.newGauge(GROUP, String.format("%s-bytes-to-remove", storeName), 0L));
+      storeDirDiffNs.computeIfAbsent(storeName,
+        kStoreName -> metricsRegistry.newTimer(GROUP, String.format("%s-dir-diff-ns", kStoreName)));
+      storeUploadNs.computeIfAbsent(storeName,
+        kStoreName -> metricsRegistry.newTimer(GROUP, String.format("%s-upload-ns", kStoreName)));
+
+      storeFilesToUpload.computeIfAbsent(storeName,
+        kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-files-to-upload", kStoreName), 0L));
+      storeFilesToRetain.computeIfAbsent(storeName,
+        kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-files-to-retain", kStoreName), 0L));
+      storeFilesToRemove.computeIfAbsent(storeName,
+        kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-files-to-remove", kStoreName), 0L));
+      storeSubDirsToUpload.computeIfAbsent(storeName,
+        kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-sub-dirs-to-upload", kStoreName), 0L));
+      storeSubDirsToRetain.computeIfAbsent(storeName,
+        kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-sub-dirs-to-retain", kStoreName), 0L));
+      storeSubDirsToRemove.computeIfAbsent(storeName,
+        kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-sub-dirs-to-remove", kStoreName), 0L));
+      storeBytesToUpload.computeIfAbsent(storeName,
+        kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-bytes-to-upload", kStoreName), 0L));
+      storeBytesToRetain.computeIfAbsent(storeName,
+        kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-bytes-to-retain", kStoreName), 0L));
+      storeBytesToRemove.computeIfAbsent(storeName,
+        kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-bytes-to-remove", kStoreName), 0L));
     }
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 0d06c3b28..953f6efe6 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -60,6 +60,34 @@ public class TestStorageConfig {
     assertEquals(2, actual.size());
     assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
 
+    //has side input stores
+    StorageConfig config = new StorageConfig(new MapConfig(
+        ImmutableMap.of(String.format(FACTORY, STORE_NAME0), "store0.factory.class",
+            String.format(StorageConfig.SIDE_INPUTS_PROCESSOR_FACTORY, STORE_NAME1), "store1.factory.class")));
+
+    actual = config.getStoreNames();
+
+    assertEquals(2, actual.size());
+    assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
+  }
+
+  @Test
+  public void testGetStoreNamesDoesNotReturnDuplicatesForSideInputs() {
+    // empty config, so no stores
+    assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig()).getStoreNames());
+
+    Set<String> expectedStoreNames = ImmutableSet.of(STORE_NAME0, STORE_NAME1);
+    // has stores
+    StorageConfig storageConfig = new StorageConfig(new MapConfig(
+        ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "store0.factory.class",
+            String.format(StorageConfig.FACTORY, STORE_NAME1), "store1.factory.class",
+            String.format(StorageConfig.SIDE_INPUTS_PROCESSOR_FACTORY, STORE_NAME1), "store1.side.inputs.processor")));
+
+    List<String> actual = storageConfig.getStoreNames();
+    // ordering shouldn't matter
+    assertEquals(2, actual.size());
+    assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
+
     //has side input stores
     StorageConfig config = new StorageConfig(new MapConfig(
         ImmutableMap.of(String.format(FACTORY, STORE_NAME0), "store0.factory.class",
@@ -504,8 +532,8 @@ public class TestStorageConfig {
     assertEquals(ImmutableSet.of(KAFKA_STATE_BACKEND_FACTORY), new StorageConfig(new MapConfig(configMap)).getBackupFactories());
     assertEquals(DEFAULT_BACKUP_FACTORIES, new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
     assertEquals(DEFAULT_BACKUP_FACTORIES, new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
-    assertEquals(ImmutableList.of(storeName2, storeName),
-        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY));
+    assertEquals(ImmutableSet.of(storeName2, storeName),
+        ImmutableSet.copyOf(new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY)));
 
     // job restore manager config set should override to job backend factory
     String jobBackupFactory1 = "jobBackendBackupFactory1";
@@ -514,16 +542,16 @@ public class TestStorageConfig {
     configMap.put(JOB_BACKUP_FACTORIES, jobBackupFactoryOverride);
     assertEquals(ImmutableSet.of(jobBackupFactory1, jobBackupFactory2),
         new StorageConfig(new MapConfig(configMap)).getBackupFactories());
-    assertEquals(ImmutableList.of(jobBackupFactory1, jobBackupFactory2),
-        new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
-    assertEquals(ImmutableList.of(jobBackupFactory1, jobBackupFactory2),
-        new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
+    assertEquals(ImmutableSet.of(jobBackupFactory1, jobBackupFactory2),
+        ImmutableSet.copyOf(new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName)));
+    assertEquals(ImmutableSet.of(jobBackupFactory1, jobBackupFactory2),
+        ImmutableSet.copyOf(new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2)));
     assertEquals(Collections.emptyList(),
         new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY));
-    assertEquals(ImmutableList.of(storeName2, storeName),
-        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory1));
-    assertEquals(ImmutableList.of(storeName2, storeName),
-        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory2));
+    assertEquals(ImmutableSet.of(storeName2, storeName),
+        ImmutableSet.copyOf(new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory1)));
+    assertEquals(ImmutableSet.of(storeName2, storeName),
+        ImmutableSet.copyOf(new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory2)));
 
     // store specific restore managers set
     String storeBackupFactory1 = "storeBackendBackupFactory1";