You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2023/03/10 21:14:13 UTC
[samza] branch master updated: Fixed line-length related formatting issues.
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 542cd48c7 Fixed line-length related formatting issues.
new 325e463db Merge pull request #1656 from prateekm/side-inputs-blob-store-restore-part-2
542cd48c7 is described below
commit 542cd48c75c6b4e697ee460e5f0724a90c0f0400
Author: Prateek Maheshwari <pm...@linkedin.com>
AuthorDate: Fri Mar 10 12:48:29 2023 -0800
Fixed line-length related formatting issues.
---
.../NonTransactionalStateTaskRestoreManager.java | 5 +--
.../TransactionalStateTaskRestoreManager.java | 5 +--
.../samza/storage/ContainerStorageManager.java | 43 +++++++++++++---------
.../apache/samza/storage/SideInputsManager.java | 16 ++++----
4 files changed, 38 insertions(+), 31 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
index e2a374282..88e4a5bce 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
@@ -346,9 +346,8 @@ class NonTransactionalStateTaskRestoreManager implements TaskRestoreManager {
taskModel.getTaskMode());
StorageEngine engine = ContainerStorageManagerUtil.createStore(storeName, storeDirectory,
StorageEngineFactory.StoreMode.BulkLoad, storageEngineFactories, this.storeChangelogs,
- taskModel, jobContext, containerContext,
- serdes, metricsRegistry, messageCollector,
- this.config);
+ taskModel, jobContext, containerContext, serdes,
+ metricsRegistry, messageCollector, this.config);
storageEngines.put(storeName, engine);
});
return storageEngines;
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 1b3762a0c..104bc2c5f 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -199,9 +199,8 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
taskModel.getTaskMode());
StorageEngine engine = ContainerStorageManagerUtil.createStore(storeName, storeDirectory,
StorageEngineFactory.StoreMode.BulkLoad, storageEngineFactories, this.storeChangelogs,
- taskModel, jobContext, containerContext,
- serdes, metricsRegistry, messageCollector,
- this.config);
+ taskModel, jobContext, containerContext, serdes,
+ metricsRegistry, messageCollector, this.config);
storageEngines.put(storeName, engine);
});
return storageEngines;
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 4b38fd855..64e52c595 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -172,8 +172,8 @@ public class ContainerStorageManager {
this.activeTaskChangelogSystemStreams =
ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(changelogSystemStreams, containerModel);
- LOG.info("Starting with changelogSystemStreams = {} sideInputSystemStreams = {}",
- this.activeTaskChangelogSystemStreams, sideInputSystemStreams);
+ LOG.info("Starting with changelogSystemStreams = {} activeTaskChangelogSystemStreams = {} sideInputSystemStreams = {}",
+ changelogSystemStreams, activeTaskChangelogSystemStreams, sideInputSystemStreams);
if (loggedStoreBaseDirectory != null && loggedStoreBaseDirectory.equals(nonLoggedStoreBaseDirectory)) {
LOG.warn("Logged and non-logged store base directory are configured to same path: {}. It is recommended to configure"
@@ -183,15 +183,17 @@ public class ContainerStorageManager {
this.storeDirectoryPaths = new HashSet<>();
- this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(changelogSystemStreams, storageEngineFactories, sideInputStoreNames, storeDirectoryPaths, containerModel,
- jobContext, containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config
- );
+ this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(
+ activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames,
+ storeDirectoryPaths, containerModel, jobContext, containerContext,
+ taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
+ loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
// Refactor Note (prateekm): in previous version, there's a subtle difference between 'this.changelogSystemStreams'
// (which is actually activeTaskChangelogSystemStreams) vs the passed in changelogSystemStreams.
// create a map from storeNames to changelog system consumers (1 per system in activeTaskChangelogSystemStreams)
- this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(activeTaskChangelogSystemStreams,
- systemFactories, samzaContainerMetrics.registry(), config);
+ this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(
+ activeTaskChangelogSystemStreams, systemFactories, samzaContainerMetrics.registry(), config);
JobConfig jobConfig = new JobConfig(config);
int restoreThreadPoolSize =
@@ -226,9 +228,8 @@ public class ContainerStorageManager {
storageEngineFactories, storeDirectoryPaths,
containerModel, jobContext, containerContext,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors,
- streamMetadataCache, systemAdmins, serdeManager, serdes,
- storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock
- );
+ streamMetadataCache, systemAdmins, serdeManager, serdes, storageManagerUtil,
+ loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
// blocks until initial side inputs restore is complete
sideInputsManager.start();
@@ -270,12 +271,16 @@ public class ContainerStorageManager {
}
taskCheckpoints.put(taskName, taskCheckpoint);
Map<String, Set<String>> backendFactoryStoreNames =
- ContainerStorageManagerUtil.getBackendFactoryStoreNames(nonSideInputStoreNames, taskCheckpoint,
- new StorageConfig(config));
+ ContainerStorageManagerUtil.getBackendFactoryStoreNames(
+ nonSideInputStoreNames, taskCheckpoint, new StorageConfig(config));
Map<String, TaskRestoreManager> taskStoreRestoreManagers =
- ContainerStorageManagerUtil.createTaskRestoreManagers(taskName, backendFactoryStoreNames, restoreStateBackendFactories,
- storageEngineFactories, storeConsumers, inMemoryStores, systemAdmins, restoreExecutor, taskModel, jobContext, containerContext, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock
- );
+ ContainerStorageManagerUtil.createTaskRestoreManagers(
+ taskName, backendFactoryStoreNames, restoreStateBackendFactories,
+ storageEngineFactories, storeConsumers,
+ inMemoryStores, systemAdmins, restoreExecutor,
+ taskModel, jobContext, containerContext,
+ samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes,
+ loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
});
@@ -356,9 +361,11 @@ public class ContainerStorageManager {
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);
// Now create persistent non side input stores in read-write mode, leave non-persistent stores as-is
- this.taskStores = ContainerStorageManagerUtil.createTaskStores(nonSideInputStoreNames, this.storageEngineFactories, this.sideInputStoreNames, this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths, this.containerModel,
- this.jobContext, this.containerContext, this.serdes, this.taskInstanceMetrics,
- this.taskInstanceCollectors, this.storageManagerUtil,
+ this.taskStores = ContainerStorageManagerUtil.createTaskStores(
+ nonSideInputStoreNames, this.storageEngineFactories, this.sideInputStoreNames,
+ this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths,
+ this.containerModel, this.jobContext, this.containerContext,
+ this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config);
// Add in memory stores
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
index 2707ecff8..96390ba3b 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
@@ -145,10 +145,11 @@ public class SideInputsManager {
this.config = config;
// create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories
- this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(sideInputStoreNames, storageEngineFactories,
- sideInputStoreNames, activeTaskChangelogSystemStreams, storeDirectoryPaths, containerModel,
- jobContext, containerContext, serdes, taskInstanceMetrics, taskInstanceCollectors,
- storageManagerUtil,
+ this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(
+ sideInputStoreNames, storageEngineFactories, sideInputStoreNames,
+ activeTaskChangelogSystemStreams, storeDirectoryPaths,
+ containerModel, jobContext, containerContext, serdes,
+ taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, sideInputStores, taskSideInputStoreSSPs,
@@ -379,8 +380,8 @@ public class SideInputsManager {
Config config, Clock clock) {
// creating sideInput store processors, one per store per task
Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors =
- createSideInputProcessors(taskInstanceMetrics, taskSideInputStoreSSPs, containerModel, serdes, new StorageConfig(config)
- );
+ createSideInputProcessors(taskInstanceMetrics, taskSideInputStoreSSPs,
+ containerModel, serdes, new StorageConfig(config));
Map<SystemStreamPartition, TaskSideInputHandler> handlers = new HashMap<>();
@@ -454,7 +455,8 @@ public class SideInputsManager {
config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName);
} else {
- // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, we rely on upstream validations to fail the deploy
+ // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config,
+ // we rely on upstream validations to fail the deploy
// if this is a standby-task and the store is a non-side-input changelog store
// we creating identity sideInputProcessor for stores of standbyTasks