You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2023/03/10 21:14:13 UTC

[samza] branch master updated: Fixed line-length related formatting issues.

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 542cd48c7 Fixed line-length related formatting issues.
     new 325e463db Merge pull request #1656 from prateekm/side-inputs-blob-store-restore-part-2
542cd48c7 is described below

commit 542cd48c75c6b4e697ee460e5f0724a90c0f0400
Author: Prateek Maheshwari <pm...@linkedin.com>
AuthorDate: Fri Mar 10 12:48:29 2023 -0800

    Fixed line-length related formatting issues.
---
 .../NonTransactionalStateTaskRestoreManager.java   |  5 +--
 .../TransactionalStateTaskRestoreManager.java      |  5 +--
 .../samza/storage/ContainerStorageManager.java     | 43 +++++++++++++---------
 .../apache/samza/storage/SideInputsManager.java    | 16 ++++----
 4 files changed, 38 insertions(+), 31 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
index e2a374282..88e4a5bce 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
@@ -346,9 +346,8 @@ class NonTransactionalStateTaskRestoreManager implements TaskRestoreManager {
           taskModel.getTaskMode());
       StorageEngine engine = ContainerStorageManagerUtil.createStore(storeName, storeDirectory,
           StorageEngineFactory.StoreMode.BulkLoad, storageEngineFactories, this.storeChangelogs,
-          taskModel, jobContext, containerContext,
-          serdes, metricsRegistry, messageCollector,
-          this.config);
+          taskModel, jobContext, containerContext, serdes,
+          metricsRegistry, messageCollector, this.config);
       storageEngines.put(storeName, engine);
     });
     return storageEngines;
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 1b3762a0c..104bc2c5f 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -199,9 +199,8 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
           taskModel.getTaskMode());
       StorageEngine engine = ContainerStorageManagerUtil.createStore(storeName, storeDirectory,
           StorageEngineFactory.StoreMode.BulkLoad, storageEngineFactories, this.storeChangelogs,
-          taskModel, jobContext, containerContext,
-          serdes, metricsRegistry, messageCollector,
-          this.config);
+          taskModel, jobContext, containerContext, serdes,
+          metricsRegistry, messageCollector, this.config);
       storageEngines.put(storeName, engine);
     });
     return storageEngines;
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 4b38fd855..64e52c595 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -172,8 +172,8 @@ public class ContainerStorageManager {
     this.activeTaskChangelogSystemStreams =
         ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(changelogSystemStreams, containerModel);
 
-    LOG.info("Starting with changelogSystemStreams = {} sideInputSystemStreams = {}",
-        this.activeTaskChangelogSystemStreams, sideInputSystemStreams);
+    LOG.info("Starting with changelogSystemStreams = {} activeTaskChangelogSystemStreams = {} sideInputSystemStreams = {}",
+        changelogSystemStreams, activeTaskChangelogSystemStreams, sideInputSystemStreams);
 
     if (loggedStoreBaseDirectory != null && loggedStoreBaseDirectory.equals(nonLoggedStoreBaseDirectory)) {
       LOG.warn("Logged and non-logged store base directory are configured to same path: {}. It is recommended to configure"
@@ -183,15 +183,17 @@ public class ContainerStorageManager {
 
     this.storeDirectoryPaths = new HashSet<>();
 
-    this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(changelogSystemStreams, storageEngineFactories, sideInputStoreNames, storeDirectoryPaths, containerModel,
-        jobContext, containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config
-    );
+    this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(
+        activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames,
+        storeDirectoryPaths, containerModel, jobContext, containerContext,
+        taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
+        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
 
     // Refactor Note (prateekm): in previous version, there's a subtle difference between 'this.changelogSystemStreams'
     // (which is actually activeTaskChangelogSystemStreams) vs the passed in changelogSystemStreams.
     // create a map from storeNames to changelog system consumers (1 per system in activeTaskChangelogSystemStreams)
-    this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(activeTaskChangelogSystemStreams,
-        systemFactories, samzaContainerMetrics.registry(), config);
+    this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(
+        activeTaskChangelogSystemStreams, systemFactories, samzaContainerMetrics.registry(), config);
 
     JobConfig jobConfig = new JobConfig(config);
     int restoreThreadPoolSize =
@@ -226,9 +228,8 @@ public class ContainerStorageManager {
         storageEngineFactories, storeDirectoryPaths,
         containerModel, jobContext, containerContext,
         samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors,
-        streamMetadataCache, systemAdmins, serdeManager, serdes,
-        storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock
-    );
+        streamMetadataCache, systemAdmins, serdeManager, serdes, storageManagerUtil,
+        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
 
     // blocks until initial side inputs restore is complete
     sideInputsManager.start();
@@ -270,12 +271,16 @@ public class ContainerStorageManager {
       }
       taskCheckpoints.put(taskName, taskCheckpoint);
       Map<String, Set<String>> backendFactoryStoreNames =
-          ContainerStorageManagerUtil.getBackendFactoryStoreNames(nonSideInputStoreNames, taskCheckpoint,
-              new StorageConfig(config));
+          ContainerStorageManagerUtil.getBackendFactoryStoreNames(
+              nonSideInputStoreNames, taskCheckpoint, new StorageConfig(config));
       Map<String, TaskRestoreManager> taskStoreRestoreManagers =
-          ContainerStorageManagerUtil.createTaskRestoreManagers(taskName, backendFactoryStoreNames, restoreStateBackendFactories,
-              storageEngineFactories, storeConsumers, inMemoryStores, systemAdmins, restoreExecutor, taskModel, jobContext, containerContext, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock
-          );
+          ContainerStorageManagerUtil.createTaskRestoreManagers(
+              taskName, backendFactoryStoreNames, restoreStateBackendFactories,
+              storageEngineFactories, storeConsumers,
+              inMemoryStores, systemAdmins, restoreExecutor,
+              taskModel, jobContext, containerContext,
+              samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes,
+              loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
       taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
     });
 
@@ -356,9 +361,11 @@ public class ContainerStorageManager {
     this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);
 
     // Now create persistent non side input stores in read-write mode, leave non-persistent stores as-is
-    this.taskStores = ContainerStorageManagerUtil.createTaskStores(nonSideInputStoreNames, this.storageEngineFactories, this.sideInputStoreNames, this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths, this.containerModel,
-        this.jobContext, this.containerContext, this.serdes, this.taskInstanceMetrics,
-        this.taskInstanceCollectors, this.storageManagerUtil,
+    this.taskStores = ContainerStorageManagerUtil.createTaskStores(
+        nonSideInputStoreNames, this.storageEngineFactories, this.sideInputStoreNames,
+        this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths,
+        this.containerModel, this.jobContext, this.containerContext,
+        this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
         this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config);
 
     // Add in memory stores
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
index 2707ecff8..96390ba3b 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
@@ -145,10 +145,11 @@ public class SideInputsManager {
     this.config = config;
 
     // create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories
-    this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(sideInputStoreNames, storageEngineFactories,
-        sideInputStoreNames, activeTaskChangelogSystemStreams, storeDirectoryPaths, containerModel,
-        jobContext, containerContext, serdes, taskInstanceMetrics, taskInstanceCollectors,
-        storageManagerUtil,
+    this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(
+        sideInputStoreNames, storageEngineFactories, sideInputStoreNames,
+        activeTaskChangelogSystemStreams, storeDirectoryPaths,
+        containerModel, jobContext, containerContext, serdes,
+        taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
         loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
 
     this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, sideInputStores, taskSideInputStoreSSPs,
@@ -379,8 +380,8 @@ public class SideInputsManager {
       Config config, Clock clock) {
     // creating sideInput store processors, one per store per task
     Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors =
-        createSideInputProcessors(taskInstanceMetrics, taskSideInputStoreSSPs, containerModel, serdes, new StorageConfig(config)
-        );
+        createSideInputProcessors(taskInstanceMetrics, taskSideInputStoreSSPs,
+            containerModel, serdes, new StorageConfig(config));
 
     Map<SystemStreamPartition, TaskSideInputHandler> handlers = new HashMap<>();
 
@@ -454,7 +455,8 @@ public class SideInputsManager {
               config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName);
 
         } else {
-          // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, we rely on upstream validations to fail the deploy
+          // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config,
+          // we rely on upstream validations to fail the deploy
 
           // if this is a standby-task and the store is a non-side-input changelog store
           // we creating identity sideInputProcessor for stores of standbyTasks