You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2023/03/06 21:55:27 UTC
[samza] branch master updated: Side Inputs + Blob Store Backups - Fix duplicate uploads and broken metrics.
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9041b3a4d Side Inputs + Blob Store Backups - Fix duplicate uploads and broken metrics.
new 3b5439283 Merge pull request #1654 from prateekm/master
9041b3a4d is described below
commit 9041b3a4d4d6dda1e16e509448aafcc503ddd6b2
Author: Prateek Maheshwari <pm...@linkedin.com>
AuthorDate: Mon Mar 6 11:48:42 2023 -0800
Side Inputs + Blob Store Backups - Fix duplicate uploads and broken metrics.
---
.../org/apache/samza/config/StorageConfig.java | 6 ++-
.../samza/storage/TaskStorageCommitManager.java | 2 +-
.../storage/blobstore/BlobStoreBackupManager.java | 9 ++--
.../metrics/BlobStoreBackupManagerMetrics.java | 49 ++++++++++++----------
.../org/apache/samza/config/TestStorageConfig.java | 48 ++++++++++++++++-----
5 files changed, 75 insertions(+), 39 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index b170bacde..0bb993f19 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -22,6 +22,7 @@ package org.apache.samza.config;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -99,7 +100,8 @@ public class StorageConfig extends MapConfig {
public List<String> getStoreNames() {
Config subConfig = subset(STORE_PREFIX, true);
- List<String> storeNames = new ArrayList<>();
+ // side input store configs can contain both the store factory and side input processor factory configs. dedup.
+ Set<String> storeNames = new HashSet<>();
for (String key : subConfig.keySet()) {
if (key.endsWith(SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX)) {
storeNames.add(key.substring(0, key.length() - SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX.length()));
@@ -107,7 +109,7 @@ public class StorageConfig extends MapConfig {
storeNames.add(key.substring(0, key.length() - FACTORY_SUFFIX.length()));
}
}
- return storeNames;
+ return new ArrayList<>(storeNames);
}
public Map<String, SystemStream> getStoreChangelogs() {
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
index 09a880710..0c01fa1ac 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
@@ -133,7 +133,7 @@ public class TaskStorageCommitManager {
// state backend factory -> store Name -> state checkpoint marker
Map<String, Map<String, String>> stateBackendToStoreSCMs = new HashMap<>();
- // for each configured state backend factory, backup the state for all stores in this task.
+ // for each configured state backend factory, snapshot the state for all stores in this task.
stateBackendToBackupManager.forEach((stateBackendFactoryName, backupManager) -> {
Map<String, String> snapshotSCMs = backupManager.snapshot(checkpointId);
LOG.debug("Created snapshot for taskName: {}, checkpoint id: {}, state backend: {}. Snapshot SCMs: {}",
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
index 2b41d09de..cb7ffad8a 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
@@ -21,6 +21,8 @@ package org.apache.samza.storage.blobstore;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,6 +31,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
@@ -73,7 +76,7 @@ public class BlobStoreBackupManager implements TaskBackupManager {
private final BlobStoreConfig blobStoreConfig;
private final Clock clock;
private final StorageManagerUtil storageManagerUtil;
- private final List<String> storesToBackup;
+ private final Set<String> storesToBackup;
private final File loggedStoreBaseDir;
private final BlobStoreManager blobStoreManager;
private final BlobStoreUtil blobStoreUtil;
@@ -117,8 +120,8 @@ public class BlobStoreBackupManager implements TaskBackupManager {
this.clock = clock;
this.storageManagerUtil = storageManagerUtil;
StorageConfig storageConfig = new StorageConfig(config);
- this.storesToBackup =
- storageConfig.getPersistentStoresWithBackupFactory(BlobStoreStateBackendFactory.class.getName());
+ this.storesToBackup = ImmutableSet.copyOf(
+ storageConfig.getPersistentStoresWithBackupFactory(BlobStoreStateBackendFactory.class.getName()));
this.loggedStoreBaseDir = loggedStoreBaseDir;
this.blobStoreManager = blobStoreManager;
this.blobStoreUtil = createBlobStoreUtil(blobStoreManager, executor, blobStoreConfig, blobStoreTaskBackupMetrics);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/metrics/BlobStoreBackupManagerMetrics.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/metrics/BlobStoreBackupManagerMetrics.java
index 5e41073f2..22b755ae1 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/metrics/BlobStoreBackupManagerMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/metrics/BlobStoreBackupManagerMetrics.java
@@ -107,30 +107,33 @@ public class BlobStoreBackupManagerMetrics {
}
public void initStoreMetrics(Collection<String> storeNames) {
+ // MetricRegistryMap#newGauge overwrites existing gauge, while newTimer retains and returns existing timer.
+ // For now, use computeIfAbsent instead of putIfAbsent to avoid overwriting old gauges and returning
+ // a new untracked one. Also to keep usage consistent b/w gauges and timers.
for (String storeName: storeNames) {
- storeDirDiffNs.putIfAbsent(storeName,
- metricsRegistry.newTimer(GROUP, String.format("%s-dir-diff-ns", storeName)));
- storeUploadNs.putIfAbsent(storeName,
- metricsRegistry.newTimer(GROUP, String.format("%s-upload-ns", storeName)));
-
- storeFilesToUpload.putIfAbsent(storeName,
- metricsRegistry.newGauge(GROUP, String.format("%s-files-to-upload", storeName), 0L));
- storeFilesToRetain.putIfAbsent(storeName,
- metricsRegistry.newGauge(GROUP, String.format("%s-files-to-retain", storeName), 0L));
- storeFilesToRemove.putIfAbsent(storeName,
- metricsRegistry.newGauge(GROUP, String.format("%s-files-to-remove", storeName), 0L));
- storeSubDirsToUpload.putIfAbsent(storeName,
- metricsRegistry.newGauge(GROUP, String.format("%s-sub-dirs-to-upload", storeName), 0L));
- storeSubDirsToRetain.putIfAbsent(storeName,
- metricsRegistry.newGauge(GROUP, String.format("%s-sub-dirs-to-retain", storeName), 0L));
- storeSubDirsToRemove.putIfAbsent(storeName,
- metricsRegistry.newGauge(GROUP, String.format("%s-sub-dirs-to-remove", storeName), 0L));
- storeBytesToUpload.putIfAbsent(storeName,
- metricsRegistry.newGauge(GROUP, String.format("%s-bytes-to-upload", storeName), 0L));
- storeBytesToRetain.putIfAbsent(storeName,
- metricsRegistry.newGauge(GROUP, String.format("%s-bytes-to-retain", storeName), 0L));
- storeBytesToRemove.putIfAbsent(storeName,
- metricsRegistry.newGauge(GROUP, String.format("%s-bytes-to-remove", storeName), 0L));
+ storeDirDiffNs.computeIfAbsent(storeName,
+ kStoreName -> metricsRegistry.newTimer(GROUP, String.format("%s-dir-diff-ns", kStoreName)));
+ storeUploadNs.computeIfAbsent(storeName,
+ kStoreName -> metricsRegistry.newTimer(GROUP, String.format("%s-upload-ns", kStoreName)));
+
+ storeFilesToUpload.computeIfAbsent(storeName,
+ kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-files-to-upload", kStoreName), 0L));
+ storeFilesToRetain.computeIfAbsent(storeName,
+ kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-files-to-retain", kStoreName), 0L));
+ storeFilesToRemove.computeIfAbsent(storeName,
+ kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-files-to-remove", kStoreName), 0L));
+ storeSubDirsToUpload.computeIfAbsent(storeName,
+ kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-sub-dirs-to-upload", kStoreName), 0L));
+ storeSubDirsToRetain.computeIfAbsent(storeName,
+ kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-sub-dirs-to-retain", kStoreName), 0L));
+ storeSubDirsToRemove.computeIfAbsent(storeName,
+ kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-sub-dirs-to-remove", kStoreName), 0L));
+ storeBytesToUpload.computeIfAbsent(storeName,
+ kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-bytes-to-upload", kStoreName), 0L));
+ storeBytesToRetain.computeIfAbsent(storeName,
+ kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-bytes-to-retain", kStoreName), 0L));
+ storeBytesToRemove.computeIfAbsent(storeName,
+ kStoreName -> metricsRegistry.newGauge(GROUP, String.format("%s-bytes-to-remove", kStoreName), 0L));
}
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 0d06c3b28..953f6efe6 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -60,6 +60,34 @@ public class TestStorageConfig {
assertEquals(2, actual.size());
assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
+ //has side input stores
+ StorageConfig config = new StorageConfig(new MapConfig(
+ ImmutableMap.of(String.format(FACTORY, STORE_NAME0), "store0.factory.class",
+ String.format(StorageConfig.SIDE_INPUTS_PROCESSOR_FACTORY, STORE_NAME1), "store1.factory.class")));
+
+ actual = config.getStoreNames();
+
+ assertEquals(2, actual.size());
+ assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
+ }
+
+ @Test
+ public void testGetStoreNamesDoesNotReturnDuplicatesForSideInputs() {
+ // empty config, so no stores
+ assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig()).getStoreNames());
+
+ Set<String> expectedStoreNames = ImmutableSet.of(STORE_NAME0, STORE_NAME1);
+ // has stores
+ StorageConfig storageConfig = new StorageConfig(new MapConfig(
+ ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "store0.factory.class",
+ String.format(StorageConfig.FACTORY, STORE_NAME1), "store1.factory.class",
+ String.format(StorageConfig.SIDE_INPUTS_PROCESSOR_FACTORY, STORE_NAME1), "store1.side.inputs.processor")));
+
+ List<String> actual = storageConfig.getStoreNames();
+ // ordering shouldn't matter
+ assertEquals(2, actual.size());
+ assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
+
//has side input stores
StorageConfig config = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(FACTORY, STORE_NAME0), "store0.factory.class",
@@ -504,8 +532,8 @@ public class TestStorageConfig {
assertEquals(ImmutableSet.of(KAFKA_STATE_BACKEND_FACTORY), new StorageConfig(new MapConfig(configMap)).getBackupFactories());
assertEquals(DEFAULT_BACKUP_FACTORIES, new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
assertEquals(DEFAULT_BACKUP_FACTORIES, new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
- assertEquals(ImmutableList.of(storeName2, storeName),
- new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY));
+ assertEquals(ImmutableSet.of(storeName2, storeName),
+ ImmutableSet.copyOf(new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY)));
// job restore manager config set should override to job backend factory
String jobBackupFactory1 = "jobBackendBackupFactory1";
@@ -514,16 +542,16 @@ public class TestStorageConfig {
configMap.put(JOB_BACKUP_FACTORIES, jobBackupFactoryOverride);
assertEquals(ImmutableSet.of(jobBackupFactory1, jobBackupFactory2),
new StorageConfig(new MapConfig(configMap)).getBackupFactories());
- assertEquals(ImmutableList.of(jobBackupFactory1, jobBackupFactory2),
- new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
- assertEquals(ImmutableList.of(jobBackupFactory1, jobBackupFactory2),
- new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
+ assertEquals(ImmutableSet.of(jobBackupFactory1, jobBackupFactory2),
+ ImmutableSet.copyOf(new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName)));
+ assertEquals(ImmutableSet.of(jobBackupFactory1, jobBackupFactory2),
+ ImmutableSet.copyOf(new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2)));
assertEquals(Collections.emptyList(),
new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY));
- assertEquals(ImmutableList.of(storeName2, storeName),
- new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory1));
- assertEquals(ImmutableList.of(storeName2, storeName),
- new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory2));
+ assertEquals(ImmutableSet.of(storeName2, storeName),
+ ImmutableSet.copyOf(new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory1)));
+ assertEquals(ImmutableSet.of(storeName2, storeName),
+ ImmutableSet.copyOf(new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory2)));
// store specific restore managers set
String storeBackupFactory1 = "storeBackendBackupFactory1";