You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by dc...@apache.org on 2022/11/28 20:59:41 UTC
[samza] branch master updated: Fix perferomance bug with async commit (#1644)
This is an automated email from the ASF dual-hosted git repository.
dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 2688cc11e Fix perferomance bug with async commit (#1644)
2688cc11e is described below
commit 2688cc11ea2cd4f5a91325a7161f869510e2a361
Author: shekhars-li <72...@users.noreply.github.com>
AuthorDate: Mon Nov 28 12:59:35 2022 -0800
Fix perferomance bug with async commit (#1644)
---
.../src/main/java/org/apache/samza/storage/StateBackendFactory.java | 3 +++
.../org/apache/samza/storage/KafkaChangelogStateBackendFactory.java | 4 +++-
.../apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java | 3 +++
.../src/main/scala/org/apache/samza/container/SamzaContainer.scala | 3 ++-
4 files changed, 11 insertions(+), 2 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
index 91f3df3d0..44b40b4dc 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
@@ -20,6 +20,7 @@
package org.apache.samza.storage;
import java.io.File;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.samza.config.Config;
@@ -29,6 +30,7 @@ import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
import org.apache.samza.util.Clock;
@@ -40,6 +42,7 @@ public interface StateBackendFactory {
TaskBackupManager getBackupManager(JobContext jobContext,
ContainerModel containerModel,
TaskModel taskModel,
+ Map<String, SystemAdmin> systemNameSystemAdminMap,
ExecutorService backupExecutor,
MetricsRegistry taskInstanceMetricsRegistry,
Config config,
diff --git a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
index 0772449f4..9f3117156 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
@@ -40,6 +40,7 @@ import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SSPMetadataCache;
import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
@@ -66,13 +67,14 @@ public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
public TaskBackupManager getBackupManager(JobContext jobContext,
ContainerModel containerModel,
TaskModel taskModel,
+ Map<String, SystemAdmin> systemNameSystemAdminsMap,
ExecutorService backupExecutor,
MetricsRegistry metricsRegistry,
Config config,
Clock clock,
File loggedStoreBaseDir,
File nonLoggedStoreBaseDir) {
- SystemAdmins systemAdmins = new SystemAdmins(config);
+ SystemAdmins systemAdmins = new SystemAdmins(systemNameSystemAdminsMap);
StorageConfig storageConfig = new StorageConfig(config);
Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
index e2512b489..dc6059840 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
@@ -21,6 +21,7 @@ package org.apache.samza.storage.blobstore;
import com.google.common.base.Preconditions;
import java.io.File;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.StringUtils;
@@ -41,6 +42,7 @@ import org.apache.samza.storage.TaskBackupManager;
import org.apache.samza.storage.TaskRestoreManager;
import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
+import org.apache.samza.system.SystemAdmin;
import org.apache.samza.util.Clock;
import org.apache.samza.util.ReflectionUtil;
@@ -51,6 +53,7 @@ public class BlobStoreStateBackendFactory implements StateBackendFactory {
JobContext jobContext,
ContainerModel containerModel,
TaskModel taskModel,
+ Map<String, SystemAdmin> systemNameSystemAdminsMap,
ExecutorService backupExecutor,
MetricsRegistry metricsRegistry,
Config config,
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index bba782525..3d3847470 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -572,6 +572,7 @@ object SamzaContainer extends Logging {
info ("Got task side input SSPs: %s" format taskSideInputSSPs)
val taskBackupManagerMap = new util.HashMap[String, TaskBackupManager]()
+ val systemAdminsMap = systemAdmins.getSystemAdmins
stateStorageBackendBackupFactories.asJava.forEach(new Consumer[StateBackendFactory] {
override def accept(factory: StateBackendFactory): Unit = {
val taskMetricsRegistry =
@@ -579,7 +580,7 @@ object SamzaContainer extends Logging {
taskInstanceMetrics.get(taskName).isDefined) taskInstanceMetrics.get(taskName).get.registry
else new MetricsRegistryMap
val taskBackupManager = factory.getBackupManager(jobContext, containerModel,
- taskModel, commitThreadPool, taskMetricsRegistry, config, SystemClock.instance(),
+ taskModel, systemAdminsMap, commitThreadPool, taskMetricsRegistry, config, SystemClock.instance,
loggedStorageBaseDir, nonLoggedStorageBaseDir)
taskBackupManagerMap.put(factory.getClass.getName, taskBackupManager)
}