You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/04 02:08:46 UTC
samza git commit: Moving store test to TestContainerStorageManager
from TestSamzaContainer
Repository: samza
Updated Branches:
refs/heads/master 7fc370407 -> f4ba9cdc1
Moving store test to TestContainerStorageManager from TestSamzaContainer
There was a test in TestSamzaContainer that needs to be moved to TestContainerStorageManager because the restore logic is moved there.
Minor change in TestSamzaContainer and ContainerStorageManager
Author: Ray Matharu <rm...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #838 from rmatharu/storeTest-fix
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f4ba9cdc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f4ba9cdc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f4ba9cdc
Branch: refs/heads/master
Commit: f4ba9cdc18651b62f833229e00e2fdd07692dfc5
Parents: 7fc3704
Author: Ray Matharu <rm...@linkedin.com>
Authored: Mon Dec 3 18:08:36 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Dec 3 18:08:36 2018 -0800
----------------------------------------------------------------------
.../apache/samza/storage/StorageRecovery.java | 23 +++++++++---------
.../apache/samza/container/SamzaContainer.scala | 4 ++--
.../samza/storage/ContainerStorageManager.java | 25 ++++++++++----------
.../samza/container/TestSamzaContainer.scala | 20 ++++------------
.../processor/StreamProcessorTestUtils.scala | 3 ++-
.../storage/TestContainerStorageManager.java | 19 ++++++++++++---
.../samza/storage/TestTaskStorageManager.scala | 3 +++
7 files changed, 53 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index be074ee..64ae310 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -21,7 +21,6 @@ package org.apache.samza.storage;
import java.io.File;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -32,6 +31,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JavaStorageConfig;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.config.StorageConfig;
+import org.apache.samza.container.TaskName;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.ContainerContextImpl;
import org.apache.samza.context.JobContextImpl;
@@ -72,10 +72,11 @@ public class StorageRecovery extends CommandLine {
private HashMap<String, SystemStream> changeLogSystemStreams = new HashMap<>();
private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<>();
private Map<String, ContainerModel> containers = new HashMap<>();
- private List<TaskStorageManager> taskStorageManagers = new ArrayList<>();
+ private ContainerStorageManager containerStorageManager;
private Logger log = LoggerFactory.getLogger(StorageRecovery.class);
private SystemAdmins systemAdmins = null;
+
/**
* Construct the StorageRecovery
*
@@ -100,7 +101,7 @@ public class StorageRecovery extends CommandLine {
getContainerModels();
getChangeLogSystemStreamsAndStorageFactories();
getChangeLogMaxPartitionNumber();
- getTaskStorageManagers();
+ getContainerStorageManager();
}
/**
@@ -112,11 +113,8 @@ public class StorageRecovery extends CommandLine {
log.info("start recovering...");
systemAdmins.start();
- for (TaskStorageManager taskStorageManager : taskStorageManagers) {
- taskStorageManager.init();
- taskStorageManager.stopStores();
- log.debug("restored " + taskStorageManager.toString());
- }
+ this.containerStorageManager.start();
+ this.containerStorageManager.shutdown();
systemAdmins.stop();
log.info("successfully recovered in " + storeBaseDir.toString());
@@ -201,8 +199,10 @@ public class StorageRecovery extends CommandLine {
* List<TaskStorageManager>
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
- private void getTaskStorageManagers() {
+ private void getContainerStorageManager() {
Clock clock = SystemClock.instance();
+ Map<TaskName, TaskStorageManager> taskStorageManagers = new HashMap<>();
+ HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers();
StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, clock);
// don't worry about prefetching for this; looks like the tool doesn't flush to offset files anyways
SSPMetadataCache sspMetadataCache =
@@ -213,7 +213,6 @@ public class StorageRecovery extends CommandLine {
ContainerContext containerContext = new ContainerContextImpl(containerModel, new MetricsRegistryMap());
for (TaskModel taskModel : containerModel.getTasks().values()) {
- HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers();
for (Entry<String, StorageEngineFactory<?, ?>> entry : storageEngineFactories.entrySet()) {
String storeName = entry.getKey();
@@ -253,8 +252,10 @@ public class StorageRecovery extends CommandLine {
new StorageConfig(jobConfig).getChangeLogDeleteRetentionsInMs(),
new SystemClock());
- taskStorageManagers.add(taskStorageManager);
+ taskStorageManagers.put(taskModel.getTaskName(), taskStorageManager);
}
}
+
+ this.containerStorageManager = new ContainerStorageManager(taskStorageManagers, storeConsumers, null);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ed50719..94bc138 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -515,7 +515,7 @@ object SamzaContainer extends Logging {
info("Created store system consumers: %s" format storeSystemConsumers)
- var taskStorageManagers : Map[TaskInstance, TaskStorageManager] = Map()
+ var taskStorageManagers : Map[TaskName, TaskStorageManager] = Map()
// Create taskInstances
val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.asScala.map(taskModel => {
@@ -679,7 +679,7 @@ object SamzaContainer extends Logging {
val taskInstance = createTaskInstance(task)
- taskStorageManagers += taskInstance -> storageManager
+ taskStorageManagers += taskInstance.taskName -> storageManager
(taskName, taskInstance)
}).toMap
http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 5fc5573..c39d6e7 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -28,7 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.samza.SamzaException;
import org.apache.samza.container.SamzaContainerMetrics;
-import org.apache.samza.container.TaskInstance;
+import org.apache.samza.container.TaskName;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.system.SystemConsumer;
import org.slf4j.Logger;
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
public class ContainerStorageManager {
private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class);
- private final Map<TaskInstance, TaskStorageManager> taskStorageManagers;
+ private final Map<TaskName, TaskStorageManager> taskStorageManagers;
private final SamzaContainerMetrics samzaContainerMetrics;
// Mapping of from storeSystemNames to SystemConsumers
@@ -61,7 +61,7 @@ public class ContainerStorageManager {
// Naming convention to be used for restore threads
private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
- public ContainerStorageManager(Map<TaskInstance, TaskStorageManager> taskStorageManagers,
+ public ContainerStorageManager(Map<TaskName, TaskStorageManager> taskStorageManagers,
Map<String, SystemConsumer> systemConsumers, SamzaContainerMetrics samzaContainerMetrics) {
this.taskStorageManagers = taskStorageManagers;
this.systemConsumers = systemConsumers;
@@ -129,30 +129,31 @@ public class ContainerStorageManager {
*/
private class TaskRestoreCallable implements Callable<Void> {
- private TaskInstance taskInstance;
+ private TaskName taskName;
private TaskStorageManager taskStorageManager;
private SamzaContainerMetrics samzaContainerMetrics;
- public TaskRestoreCallable(SamzaContainerMetrics samzaContainerMetrics, TaskInstance taskInstance,
+ public TaskRestoreCallable(SamzaContainerMetrics samzaContainerMetrics, TaskName taskName,
TaskStorageManager taskStorageManager) {
this.samzaContainerMetrics = samzaContainerMetrics;
- this.taskInstance = taskInstance;
+ this.taskName = taskName;
this.taskStorageManager = taskStorageManager;
}
@Override
public Void call() {
long startTime = System.currentTimeMillis();
- LOG.info("Starting stores in task instance {}", this.taskInstance.taskName().getTaskName());
+ LOG.info("Starting stores in task instance {}", this.taskName.getTaskName());
taskStorageManager.restoreStores();
long timeToRestore = System.currentTimeMillis() - startTime;
- Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics()
- .getOrDefault(this.taskInstance.taskName().getTaskName(), null);
- if (taskGauge != null) {
- taskGauge.set(timeToRestore);
- }
+ if (this.samzaContainerMetrics != null) {
+ Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(this.taskName, null);
+ if (taskGauge != null) {
+ taskGauge.set(timeToRestore);
+ }
+ }
return null;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index eca4673..760e358 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -28,14 +28,13 @@ import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
import org.apache.samza.metrics.{Gauge, Timer}
+import org.apache.samza.storage.{ContainerStorageManager, TaskStorageManager}
import org.apache.samza.system._
import org.apache.samza.{Partition, SamzaContainerStatus}
import org.junit.Assert._
import org.junit.{Before, Test}
import org.mockito.Matchers.{any, notNull}
import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mockito.MockitoSugar
@@ -67,6 +66,9 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
@Mock
private var samzaContainerListener: SamzaContainerListener = null
+ @Mock
+ private var containerStorageManager: ContainerStorageManager = null
+
private var samzaContainer: SamzaContainer = null
@Before
@@ -153,18 +155,6 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
}
@Test
- def testStartStoresIncrementsCounter() {
- when(this.taskInstance.taskName).thenReturn(TASK_NAME)
- val restoreGauge = mock[Gauge[Long]]
- when(this.metrics.taskStoreRestorationMetrics).thenReturn(Map(TASK_NAME -> restoreGauge))
-
- this.samzaContainer.startStores
- val restoreGaugeValueCaptor = ArgumentCaptor.forClass(classOf[Long])
- verify(restoreGauge).set(restoreGaugeValueCaptor.capture())
- assertTrue(restoreGaugeValueCaptor.getValue >= 1)
- }
-
- @Test
def testApplicationContainerContext() {
val orderVerifier = inOrder(this.applicationContainerContext, this.runLoop)
this.samzaContainer.run
@@ -278,7 +268,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
this.producerMultiplexer,
metrics,
containerContext = this.containerContext,
- applicationContainerContextOption = applicationContainerContext, containerStorageManager = null)
+ applicationContainerContextOption = applicationContainerContext, containerStorageManager = containerStorageManager)
this.samzaContainer.setContainerListener(this.samzaContainerListener)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index d7c71fa..69bae4b 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -26,6 +26,7 @@ import org.apache.samza.container._
import org.apache.samza.context.{ContainerContext, JobContext}
import org.apache.samza.job.model.TaskModel
import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.storage.ContainerStorageManager
import org.apache.samza.system._
import org.apache.samza.system.chooser.RoundRobinChooser
import org.apache.samza.task.{StreamTask, TaskInstanceCollector}
@@ -68,7 +69,7 @@ object StreamProcessorTestUtils {
metrics = new SamzaContainerMetrics,
containerContext = containerContext,
applicationContainerContextOption = None,
- containerStorageManager = null)
+ containerStorageManager = Mockito.mock(classOf[ContainerStorageManager]))
container
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index dba8678..5e71efc 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstance;
import org.apache.samza.container.TaskName;
+import org.apache.samza.metrics.Gauge;
import org.apache.samza.system.SystemConsumer;
import org.junit.Assert;
import org.junit.Before;
@@ -35,7 +36,7 @@ public class TestContainerStorageManager {
private ContainerStorageManager containerStorageManager;
private Map<String, SystemConsumer> systemConsumers;
- private Map<TaskInstance, TaskStorageManager> taskStorageManagers;
+ private Map<TaskName, TaskStorageManager> taskStorageManagers;
private SamzaContainerMetrics samzaContainerMetrics;
private CountDownLatch taskStorageManagersRestoreStoreCount;
@@ -45,6 +46,8 @@ public class TestContainerStorageManager {
private CountDownLatch systemConsumerStartCount;
private CountDownLatch systemConsumerStopCount;
+ private Map<TaskName, Gauge<Object>> taskRestoreMetricGauges;
+
/**
* Utility method for creating a mocked taskInstance and taskStorageManager and adding it to the map.
* @param taskname the desired taskname.
@@ -71,11 +74,15 @@ public class TestContainerStorageManager {
return null;
}).when(mockTaskStorageManager).restoreStores();
- taskStorageManagers.put(mockTaskInstance, mockTaskStorageManager);
+ taskStorageManagers.put(new TaskName(taskname), mockTaskStorageManager);
+
+ Gauge testGauge = Mockito.mock(Gauge.class);
+ this.taskRestoreMetricGauges.put(new TaskName(taskname), testGauge);
}
@Before
public void setUp() {
+ taskRestoreMetricGauges = new HashMap<>();
systemConsumers = new HashMap<>();
taskStorageManagers = new HashMap<>();
@@ -93,6 +100,7 @@ public class TestContainerStorageManager {
// mock container metrics
samzaContainerMetrics = Mockito.mock(SamzaContainerMetrics.class);
+ Mockito.when(samzaContainerMetrics.taskStoreRestorationMetrics()).thenReturn(taskRestoreMetricGauges);
// mock and setup sysconsumers
SystemConsumer mockSystemConsumer = Mockito.mock(SystemConsumer.class);
@@ -112,7 +120,7 @@ public class TestContainerStorageManager {
}
@Test
- public void testParallelism() {
+ public void testParallelismAndMetrics() {
this.containerStorageManager.start();
this.containerStorageManager.shutdown();
Assert.assertTrue("init count should be 0", this.taskStorageManagersInitCount.getCount() == 0);
@@ -121,5 +129,10 @@ public class TestContainerStorageManager {
Assert.assertTrue("systemConsumerStopCount count should be 0", this.systemConsumerStopCount.getCount() == 0);
Assert.assertTrue("systemConsumerStartCount count should be 0", this.systemConsumerStartCount.getCount() == 0);
+
+ for (Gauge gauge : taskRestoreMetricGauges.values()) {
+ Assert.assertTrue("Restoration time gauge value should be invoked atleast once", Mockito.mockingDetails(gauge).getInvocations().size() >= 1);
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 0b945cb..ffdceca 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -106,6 +106,9 @@ class TestTaskStorageManager extends MockitoSugar {
taskManager.init
+ // mocking restore (issued by ContainerStorageManager)
+ mockStorageEngine.restore(mock[util.Iterator[IncomingMessageEnvelope]])
+
assertTrue(storeFile.exists())
assertFalse(offsetFile.exists())
verify(mockSystemConsumer).register(ssp, "0")