You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/06/08 00:58:05 UTC
samza git commit: SAMZA-1329: Switch SamzaTaskProxy to use
LocalityManager.
Repository: samza
Updated Branches:
refs/heads/master 451b2e538 -> 5418a981d
SAMZA-1329: Switch SamzaTaskProxy to use LocalityManager.
TasksResource(/tasks/) in samza-rest is relying on `JobModelManager.readJobModel` to get jobmodel from (CoordinatorStream, jobConfig). This created binary dependencies of systemstreams defined in task.inputs of job config into samza-rest. Managing those dependencies is hard and unnecessary.
This PR updates `TasksResource` to use `LocalityManager` to read task to container locality.
In the future, this api will read jobmodel from `SamzaMetadatasystem` store.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Jacob Maes <jm...@linkedin.com>
Closes #219 from shanthoosh/fix-2
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5418a981
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5418a981
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5418a981
Branch: refs/heads/master
Commit: 5418a981d4f9de99aac4e7b3d565373d6dd328df
Parents: 451b2e5
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Wed Jun 7 17:57:54 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Wed Jun 7 17:57:54 2017 -0700
----------------------------------------------------------------------
.../AbstractCoordinatorStreamManager.java | 20 ++-
.../samza/rest/proxy/task/SamzaTaskProxy.java | 121 +++++++------------
.../samza/rest/resources/TestTasksResource.java | 25 ++--
.../resources/mock/MockInstallationFinder.java | 2 +-
.../rest/resources/mock/MockTaskProxy.java | 49 +++-----
5 files changed, 87 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/5418a981/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
index 813234b..9b0d849 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
@@ -48,7 +48,9 @@ public abstract class AbstractCoordinatorStreamManager {
* Starts the underlying coordinator stream producer and consumer.
*/
public void start() {
- coordinatorStreamProducer.start();
+ if (coordinatorStreamProducer != null) {
+ coordinatorStreamProducer.start();
+ }
if (coordinatorStreamConsumer != null) {
coordinatorStreamConsumer.start();
}
@@ -61,7 +63,9 @@ public abstract class AbstractCoordinatorStreamManager {
if (coordinatorStreamConsumer != null) {
coordinatorStreamConsumer.stop();
}
- coordinatorStreamProducer.stop();
+ if (coordinatorStreamProducer != null) {
+ coordinatorStreamProducer.stop();
+ }
}
/**
@@ -69,6 +73,10 @@ public abstract class AbstractCoordinatorStreamManager {
* @param message message which should be sent to producer
*/
public void send(CoordinatorStreamMessage message) {
+ if (coordinatorStreamProducer == null) {
+ throw new UnsupportedOperationException(String.format("CoordinatorStreamProducer is not initialized in the AbstractCoordinatorStreamManager. "
+ + "manager registered source: %s, input source: %s", this.source, source));
+ }
coordinatorStreamProducer.send(message);
}
@@ -89,7 +97,9 @@ public abstract class AbstractCoordinatorStreamManager {
* Register the coordinator stream consumer.
*/
protected void registerCoordinatorStreamConsumer() {
- coordinatorStreamConsumer.register();
+ if (coordinatorStreamConsumer != null) {
+ coordinatorStreamConsumer.register();
+ }
}
/**
@@ -97,7 +107,9 @@ public abstract class AbstractCoordinatorStreamManager {
* @param source the source to register
*/
protected void registerCoordinatorStreamProducer(String source) {
- coordinatorStreamProducer.register(source);
+ if (coordinatorStreamProducer != null) {
+ coordinatorStreamProducer.register(source);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/5418a981/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
index a412c08..da7b907 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -34,25 +34,15 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.rest.model.Partition;
import org.apache.samza.rest.model.Task;
import org.apache.samza.rest.proxy.installation.InstallationFinder;
import org.apache.samza.rest.proxy.installation.InstallationRecord;
import org.apache.samza.rest.proxy.job.JobInstance;
-import org.apache.samza.storage.ChangelogPartitionManager;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
import org.apache.samza.util.ClassLoaderHelper;
-import org.apache.samza.util.SystemClock;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,8 +58,6 @@ public class SamzaTaskProxy implements TaskProxy {
private static final MetricsRegistryMap METRICS_REGISTRY = new MetricsRegistryMap();
- private static final String SOURCE = "SamzaTaskProxy";
-
private final TaskResourceConfig taskResourceConfig;
private final InstallationFinder installFinder;
@@ -85,28 +73,37 @@ public class SamzaTaskProxy implements TaskProxy {
* {@inheritDoc}
*/
@Override
- public List<Task> getTasks(JobInstance jobInstance)
- throws IOException, InterruptedException {
+ public List<Task> getTasks(JobInstance jobInstance) throws IOException, InterruptedException {
Preconditions.checkArgument(installFinder.isInstalled(jobInstance),
String.format("Invalid job instance : %s", jobInstance));
- JobModel jobModel = getJobModel(jobInstance);
- StorageConfig storageConfig = new StorageConfig(jobModel.getConfig());
-
- List<String> storeNames = JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava();
- Map<String, String> containerLocality = jobModel.getAllContainerLocality();
- List<Task> tasks = new ArrayList<>();
- for (ContainerModel containerModel : jobModel.getContainers().values()) {
- String containerId = containerModel.getProcessorId();
- String host = containerLocality.get(containerId);
- for (TaskModel taskModel : containerModel.getTasks().values()) {
- String taskName = taskModel.getTaskName().getTaskName();
- List<Partition> partitions = taskModel.getSystemStreamPartitions()
- .stream()
- .map(Partition::new).collect(Collectors.toList());
- tasks.add(new Task(host, taskName, containerId, partitions, storeNames));
+ CoordinatorStreamSystemConsumer coordinatorStreamSystemConsumer = null;
+ try {
+ coordinatorStreamSystemConsumer = initializeCoordinatorStreamConsumer(jobInstance);
+ return readTasksFromCoordinatorStream(coordinatorStreamSystemConsumer);
+ } finally {
+ if (coordinatorStreamSystemConsumer != null) {
+ coordinatorStreamSystemConsumer.stop();
}
}
- return tasks;
+ }
+
+ /**
+ * Initialize {@link CoordinatorStreamSystemConsumer} based upon {@link JobInstance} parameter.
+ * @param jobInstance the job instance to get CoordinatorStreamSystemConsumer for.
+ * @return built and initialized CoordinatorStreamSystemConsumer.
+ */
+ protected CoordinatorStreamSystemConsumer initializeCoordinatorStreamConsumer(JobInstance jobInstance) {
+ CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
+ Config coordinatorSystemConfig = getCoordinatorSystemConfig(jobInstance);
+ LOG.debug("Using config: {} to create coordinatorStream consumer.", coordinatorSystemConfig);
+ CoordinatorStreamSystemConsumer consumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY);
+ LOG.debug("Registering coordinator system stream consumer.");
+ consumer.register();
+ LOG.debug("Starting coordinator system stream consumer.");
+ consumer.start();
+ LOG.debug("Bootstrapping coordinator system stream consumer.");
+ consumer.bootstrap();
+ return consumer;
}
/**
@@ -129,53 +126,21 @@ public class SamzaTaskProxy implements TaskProxy {
}
/**
- * Retrieves the jobModel from the jobCoordinator.
- * @param jobInstance the job instance (jobId, jobName).
- * @return the JobModel fetched from the coordinator stream.
+ * Builds list of {@link Task} from job model in coordinator stream.
+ * @param consumer system consumer associated with a job's coordinator stream.
+ * @return list of {@link Task} constructed from job model in coordinator stream.
*/
- protected JobModel getJobModel(JobInstance jobInstance) {
- CoordinatorStreamSystemConsumer coordinatorSystemConsumer = null;
- CoordinatorStreamSystemProducer coordinatorSystemProducer = null;
- try {
- CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
- Config coordinatorSystemConfig = getCoordinatorSystemConfig(jobInstance);
- LOG.info("Using config: {} to create coordinatorStream producer and consumer.", coordinatorSystemConfig);
- coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY);
- coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, METRICS_REGISTRY);
- LOG.info("Registering coordinator system stream consumer.");
- coordinatorSystemConsumer.register();
- LOG.debug("Starting coordinator system stream consumer.");
- coordinatorSystemConsumer.start();
- LOG.debug("Bootstrapping coordinator system stream consumer.");
- coordinatorSystemConsumer.bootstrap();
- LOG.info("Registering coordinator system stream producer.");
- coordinatorSystemProducer.register(SOURCE);
-
- Config config = coordinatorSystemConsumer.getConfig();
- LOG.info("Got config from coordinatorSystemConsumer: {}.", config);
- ChangelogPartitionManager changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, SOURCE);
- changelogManager.start();
- LocalityManager localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer);
- localityManager.start();
-
- String jobCoordinatorSystemName = config.get(JobConfig.JOB_COORDINATOR_SYSTEM());
-
- /**
- * Select job coordinator system properties from config and instantiate SystemAdmin for it alone.
- * Instantiating SystemAdmin's for other input/output systems defined in config is unnecessary.
- */
- Config systemAdminConfig = config.subset(String.format("systems.%s", jobCoordinatorSystemName), false);
- scala.collection.immutable.Map<String, SystemAdmin> systemAdmins = JobModelManager.getSystemAdmins(systemAdminConfig);
- StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
- Map<TaskName, Integer> changeLogPartitionMapping = changelogManager.readChangeLogPartitionMapping();
- return JobModelManager.readJobModel(config, changeLogPartitionMapping, localityManager, streamMetadataCache, null);
- } finally {
- if (coordinatorSystemConsumer != null) {
- coordinatorSystemConsumer.stop();
- }
- if (coordinatorSystemProducer != null) {
- coordinatorSystemProducer.stop();
- }
- }
+ protected List<Task> readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
+ LocalityManager localityManager = new LocalityManager(null, consumer);
+ Map<String, Map<String, String>> containerIdToHostMapping = localityManager.readContainerLocality();
+ Map<String, String> taskNameToContainerIdMapping = localityManager.getTaskAssignmentManager().readTaskAssignment();
+ StorageConfig storageConfig = new StorageConfig(consumer.getConfig());
+ List<String> storeNames = JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava();
+ return taskNameToContainerIdMapping.entrySet()
+ .stream()
+ .map(entry -> {
+ String hostName = containerIdToHostMapping.get(entry.getValue()).get(SetContainerHostMapping.HOST_KEY);
+ return new Task(hostName, entry.getKey(), entry.getValue(), new ArrayList<>(), storeNames);
+ }).collect(Collectors.toList());
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5418a981/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java
index 63a9958..a9dbfcf 100644
--- a/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/TestTasksResource.java
@@ -18,17 +18,14 @@
*/
package org.apache.samza.rest.resources;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Response;
import org.apache.samza.config.MapConfig;
import org.apache.samza.rest.SamzaRestApplication;
import org.apache.samza.rest.SamzaRestConfig;
-import org.apache.samza.rest.model.Partition;
import org.apache.samza.rest.model.Task;
import org.apache.samza.rest.proxy.task.TaskResourceConfig;
import org.apache.samza.rest.resources.mock.MockJobProxy;
@@ -46,7 +43,7 @@ import static org.junit.Assert.assertTrue;
public class TestTasksResource extends JerseyTest {
- ObjectMapper objectMapper = SamzaObjectMapper.getObjectMapper();
+ private ObjectMapper objectMapper = SamzaObjectMapper.getObjectMapper();
@Override
protected Application configure() {
@@ -59,31 +56,26 @@ public class TestTasksResource extends JerseyTest {
}
@Test
- public void testGetTasks()
- throws IOException {
+ public void testGetTasks() throws IOException {
String requestUrl = String.format("v1/jobs/%s/%s/tasks", "testJobName", "testJobId");
Response response = target(requestUrl).request().get();
assertEquals(200, response.getStatus());
Task[] tasks = objectMapper.readValue(response.readEntity(String.class), Task[].class);
assertEquals(2, tasks.length);
- List<Partition> partitionList = ImmutableList.of(new Partition(MockTaskProxy.SYSTEM_NAME,
- MockTaskProxy.STREAM_NAME,
- MockTaskProxy.PARTITION_ID));
- assertEquals(null, tasks[0].getPreferredHost());
+ assertEquals(MockTaskProxy.TASK_1_PREFERRED_HOST, tasks[0].getPreferredHost());
assertEquals(MockTaskProxy.TASK_1_CONTAINER_ID, tasks[0].getContainerId());
assertEquals(MockTaskProxy.TASK_1_NAME, tasks[0].getTaskName());
- assertEquals(partitionList, tasks[0].getPartitions());
+ assertEquals(MockTaskProxy.PARTITIONS, tasks[0].getPartitions());
- assertEquals(null, tasks[1].getPreferredHost());
+ assertEquals(MockTaskProxy.TASK_2_PREFERRED_HOST, tasks[1].getPreferredHost());
assertEquals(MockTaskProxy.TASK_2_CONTAINER_ID, tasks[1].getContainerId());
assertEquals(MockTaskProxy.TASK_2_NAME, tasks[1].getTaskName());
- assertEquals(partitionList, tasks[1].getPartitions());
+ assertEquals(MockTaskProxy.PARTITIONS, tasks[1].getPartitions());
}
@Test
- public void testGetTasksWithInvalidJobName()
- throws IOException {
+ public void testGetTasksWithInvalidJobName() throws IOException {
String requestUrl = String.format("v1/jobs/%s/%s/tasks", "BadJobName", MockJobProxy.JOB_INSTANCE_4_ID);
Response resp = target(requestUrl).request().get();
assertEquals(400, resp.getStatus());
@@ -93,8 +85,7 @@ public class TestTasksResource extends JerseyTest {
}
@Test
- public void testGetTasksWithInvalidJobId()
- throws IOException {
+ public void testGetTasksWithInvalidJobId() throws IOException {
String requestUrl = String.format("v1/jobs/%s/%s/tasks", MockJobProxy.JOB_INSTANCE_1_NAME, "BadJobId");
Response resp = target(requestUrl).request().get();
assertEquals(400, resp.getStatus());
http://git-wip-us.apache.org/repos/asf/samza/blob/5418a981/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java
index aa6a0ba..6295ac5 100644
--- a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockInstallationFinder.java
@@ -29,7 +29,7 @@ public class MockInstallationFinder implements InstallationFinder {
@Override
public boolean isInstalled(JobInstance jobInstance) {
- return true;
+ return !jobInstance.getJobId().contains("Bad") && !jobInstance.getJobName().contains("Bad");
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/5418a981/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
index de741ba..afebf1d 100644
--- a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
@@ -18,34 +18,30 @@
*/
package org.apache.samza.rest.resources.mock;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import java.util.Set;
-import org.apache.samza.Partition;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.rest.model.Partition;
+import org.apache.samza.rest.model.Task;
import org.apache.samza.rest.proxy.job.JobInstance;
import org.apache.samza.rest.proxy.task.SamzaTaskProxy;
import org.apache.samza.rest.proxy.task.TaskResourceConfig;
-import org.apache.samza.system.SystemStreamPartition;
+import org.mockito.Mockito;
public class MockTaskProxy extends SamzaTaskProxy {
- public static final String SYSTEM_NAME = "testSystem";
- public static final String STREAM_NAME = "testStream";
- public static final Integer PARTITION_ID = 1;
- public static final Set<SystemStreamPartition> SYSTEM_STREAM_PARTITIONS = ImmutableSet.of(
- new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME, new Partition(PARTITION_ID)));
+ public static final List<Partition> PARTITIONS = ImmutableList.of();
public static final String TASK_1_NAME = "Task1";
public static final String TASK_1_CONTAINER_ID = "1";
- public static final Partition CHANGE_LOG_PARTITION = new Partition(0);
+ public static final String TASK_1_PREFERRED_HOST = "TASK_1_PREFERRED_HOST";
+ public static final List<String> TASK_1_STORE_NAMES = ImmutableList.of("Task1Store1", "Task1Store2");
public static final String TASK_2_NAME = "Task2";
public static final String TASK_2_CONTAINER_ID = "2";
+ public static final String TASK_2_PREFERRED_HOST = "TASK_1_PREFERRED_HOST";
+ public static final List<String> TASK_2_STORE_NAMES = ImmutableList.of("Task2Store1", "Task2Store2", "Task2Store3");
public MockTaskProxy() {
super(new TaskResourceConfig(new MapConfig()),
@@ -53,20 +49,13 @@ public class MockTaskProxy extends SamzaTaskProxy {
}
@Override
- protected JobModel getJobModel(JobInstance jobInstance) {
- if (jobInstance.getJobId().contains("Bad")
- || jobInstance.getJobName().contains("Bad")) {
- throw new IllegalArgumentException("No tasks found.");
- }
- TaskModel task1Model = new TaskModel(new TaskName(TASK_1_NAME), SYSTEM_STREAM_PARTITIONS, CHANGE_LOG_PARTITION);
- TaskModel task2Model = new TaskModel(new TaskName(TASK_2_NAME), SYSTEM_STREAM_PARTITIONS, CHANGE_LOG_PARTITION);
- ContainerModel task1ContainerModel = new ContainerModel(TASK_1_CONTAINER_ID, 1,
- ImmutableMap.of(new TaskName(TASK_1_NAME),
- task1Model));
- ContainerModel task2ContainerModel = new ContainerModel(TASK_2_CONTAINER_ID, 2,
- ImmutableMap.of(new TaskName(TASK_2_NAME),
- task2Model));
- return new JobModel(new MapConfig(), ImmutableMap.of(TASK_1_CONTAINER_ID, task1ContainerModel,
- TASK_2_CONTAINER_ID, task2ContainerModel));
+ protected CoordinatorStreamSystemConsumer initializeCoordinatorStreamConsumer(JobInstance jobInstance) {
+ return Mockito.mock(CoordinatorStreamSystemConsumer.class);
+ }
+
+ @Override
+ protected List<Task> readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
+ return ImmutableList.of(new Task(TASK_1_PREFERRED_HOST, TASK_1_NAME, TASK_1_CONTAINER_ID, PARTITIONS, TASK_1_STORE_NAMES),
+ new Task(TASK_2_PREFERRED_HOST, TASK_2_NAME, TASK_2_CONTAINER_ID, PARTITIONS, TASK_2_STORE_NAMES));
}
}