You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/04 02:08:46 UTC

samza git commit: Moving store test to TestContainerStorageManager from TestSamzaContainer

Repository: samza
Updated Branches:
  refs/heads/master 7fc370407 -> f4ba9cdc1


Moving store test to TestContainerStorageManager from TestSamzaContainer

There was a test in TestSamzaContainer that needs to be moved to TestContainerStorageManager because the restore logic is moved there.

Minor change in TestSamzaContainer and ContainerStorageManager

Author: Ray Matharu <rm...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #838 from rmatharu/storeTest-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f4ba9cdc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f4ba9cdc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f4ba9cdc

Branch: refs/heads/master
Commit: f4ba9cdc18651b62f833229e00e2fdd07692dfc5
Parents: 7fc3704
Author: Ray Matharu <rm...@linkedin.com>
Authored: Mon Dec 3 18:08:36 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Dec 3 18:08:36 2018 -0800

----------------------------------------------------------------------
 .../apache/samza/storage/StorageRecovery.java   | 23 +++++++++---------
 .../apache/samza/container/SamzaContainer.scala |  4 ++--
 .../samza/storage/ContainerStorageManager.java  | 25 ++++++++++----------
 .../samza/container/TestSamzaContainer.scala    | 20 ++++------------
 .../processor/StreamProcessorTestUtils.scala    |  3 ++-
 .../storage/TestContainerStorageManager.java    | 19 ++++++++++++---
 .../samza/storage/TestTaskStorageManager.scala  |  3 +++
 7 files changed, 53 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index be074ee..64ae310 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -21,7 +21,6 @@ package org.apache.samza.storage;
 
 import java.io.File;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -32,6 +31,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaStorageConfig;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.StorageConfig;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.ContainerContextImpl;
 import org.apache.samza.context.JobContextImpl;
@@ -72,10 +72,11 @@ public class StorageRecovery extends CommandLine {
   private HashMap<String, SystemStream> changeLogSystemStreams = new HashMap<>();
   private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<>();
   private Map<String, ContainerModel> containers = new HashMap<>();
-  private List<TaskStorageManager> taskStorageManagers = new ArrayList<>();
+  private ContainerStorageManager containerStorageManager;
   private Logger log = LoggerFactory.getLogger(StorageRecovery.class);
   private SystemAdmins systemAdmins = null;
 
+
   /**
    * Construct the StorageRecovery
    *
@@ -100,7 +101,7 @@ public class StorageRecovery extends CommandLine {
     getContainerModels();
     getChangeLogSystemStreamsAndStorageFactories();
     getChangeLogMaxPartitionNumber();
-    getTaskStorageManagers();
+    getContainerStorageManager();
   }
 
   /**
@@ -112,11 +113,8 @@ public class StorageRecovery extends CommandLine {
     log.info("start recovering...");
 
     systemAdmins.start();
-    for (TaskStorageManager taskStorageManager : taskStorageManagers) {
-      taskStorageManager.init();
-      taskStorageManager.stopStores();
-      log.debug("restored " + taskStorageManager.toString());
-    }
+    this.containerStorageManager.start();
+    this.containerStorageManager.shutdown();
     systemAdmins.stop();
 
     log.info("successfully recovered in " + storeBaseDir.toString());
@@ -201,8 +199,10 @@ public class StorageRecovery extends CommandLine {
    * List<TaskStorageManager>
    */
   @SuppressWarnings({ "unchecked", "rawtypes" })
-  private void getTaskStorageManagers() {
+  private void getContainerStorageManager() {
     Clock clock = SystemClock.instance();
+    Map<TaskName, TaskStorageManager> taskStorageManagers = new HashMap<>();
+    HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers();
     StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, clock);
     // don't worry about prefetching for this; looks like the tool doesn't flush to offset files anyways
     SSPMetadataCache sspMetadataCache =
@@ -213,7 +213,6 @@ public class StorageRecovery extends CommandLine {
       ContainerContext containerContext = new ContainerContextImpl(containerModel, new MetricsRegistryMap());
 
       for (TaskModel taskModel : containerModel.getTasks().values()) {
-        HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers();
 
         for (Entry<String, StorageEngineFactory<?, ?>> entry : storageEngineFactories.entrySet()) {
           String storeName = entry.getKey();
@@ -253,8 +252,10 @@ public class StorageRecovery extends CommandLine {
             new StorageConfig(jobConfig).getChangeLogDeleteRetentionsInMs(),
             new SystemClock());
 
-        taskStorageManagers.add(taskStorageManager);
+        taskStorageManagers.put(taskModel.getTaskName(), taskStorageManager);
       }
     }
+
+    this.containerStorageManager = new ContainerStorageManager(taskStorageManagers, storeConsumers, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ed50719..94bc138 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -515,7 +515,7 @@ object SamzaContainer extends Logging {
 
     info("Created store system consumers: %s" format storeSystemConsumers)
 
-    var taskStorageManagers : Map[TaskInstance, TaskStorageManager] = Map()
+    var taskStorageManagers : Map[TaskName, TaskStorageManager] = Map()
 
     // Create taskInstances
     val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.asScala.map(taskModel => {
@@ -679,7 +679,7 @@ object SamzaContainer extends Logging {
 
       val taskInstance = createTaskInstance(task)
 
-      taskStorageManagers += taskInstance -> storageManager
+      taskStorageManagers += taskInstance.taskName -> storageManager
       (taskName, taskInstance)
     }).toMap
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 5fc5573..c39d6e7 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -28,7 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.SamzaContainerMetrics;
-import org.apache.samza.container.TaskInstance;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.system.SystemConsumer;
 import org.slf4j.Logger;
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
 public class ContainerStorageManager {
 
   private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class);
-  private final Map<TaskInstance, TaskStorageManager> taskStorageManagers;
+  private final Map<TaskName, TaskStorageManager> taskStorageManagers;
   private final SamzaContainerMetrics samzaContainerMetrics;
 
   // Mapping of from storeSystemNames to SystemConsumers
@@ -61,7 +61,7 @@ public class ContainerStorageManager {
   // Naming convention to be used for restore threads
   private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
 
-  public ContainerStorageManager(Map<TaskInstance, TaskStorageManager> taskStorageManagers,
+  public ContainerStorageManager(Map<TaskName, TaskStorageManager> taskStorageManagers,
       Map<String, SystemConsumer> systemConsumers, SamzaContainerMetrics samzaContainerMetrics) {
     this.taskStorageManagers = taskStorageManagers;
     this.systemConsumers = systemConsumers;
@@ -129,30 +129,31 @@ public class ContainerStorageManager {
    */
   private class TaskRestoreCallable implements Callable<Void> {
 
-    private TaskInstance taskInstance;
+    private TaskName taskName;
     private TaskStorageManager taskStorageManager;
     private SamzaContainerMetrics samzaContainerMetrics;
 
-    public TaskRestoreCallable(SamzaContainerMetrics samzaContainerMetrics, TaskInstance taskInstance,
+    public TaskRestoreCallable(SamzaContainerMetrics samzaContainerMetrics, TaskName taskName,
         TaskStorageManager taskStorageManager) {
       this.samzaContainerMetrics = samzaContainerMetrics;
-      this.taskInstance = taskInstance;
+      this.taskName = taskName;
       this.taskStorageManager = taskStorageManager;
     }
 
     @Override
     public Void call() {
       long startTime = System.currentTimeMillis();
-      LOG.info("Starting stores in task instance {}", this.taskInstance.taskName().getTaskName());
+      LOG.info("Starting stores in task instance {}", this.taskName.getTaskName());
       taskStorageManager.restoreStores();
       long timeToRestore = System.currentTimeMillis() - startTime;
-      Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics()
-          .getOrDefault(this.taskInstance.taskName().getTaskName(), null);
 
-      if (taskGauge != null) {
-        taskGauge.set(timeToRestore);
-      }
+      if (this.samzaContainerMetrics != null) {
+        Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(this.taskName, null);
 
+        if (taskGauge != null) {
+          taskGauge.set(timeToRestore);
+        }
+      }
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index eca4673..760e358 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -28,14 +28,13 @@ import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
 import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
 import org.apache.samza.metrics.{Gauge, Timer}
+import org.apache.samza.storage.{ContainerStorageManager, TaskStorageManager}
 import org.apache.samza.system._
 import org.apache.samza.{Partition, SamzaContainerStatus}
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.mockito.Matchers.{any, notNull}
 import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
 import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
@@ -67,6 +66,9 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   @Mock
   private var samzaContainerListener: SamzaContainerListener = null
 
+  @Mock
+  private var containerStorageManager: ContainerStorageManager = null
+
   private var samzaContainer: SamzaContainer = null
 
   @Before
@@ -153,18 +155,6 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   }
 
   @Test
-  def testStartStoresIncrementsCounter() {
-    when(this.taskInstance.taskName).thenReturn(TASK_NAME)
-    val restoreGauge = mock[Gauge[Long]]
-    when(this.metrics.taskStoreRestorationMetrics).thenReturn(Map(TASK_NAME -> restoreGauge))
-
-    this.samzaContainer.startStores
-    val restoreGaugeValueCaptor = ArgumentCaptor.forClass(classOf[Long])
-    verify(restoreGauge).set(restoreGaugeValueCaptor.capture())
-    assertTrue(restoreGaugeValueCaptor.getValue >= 1)
-  }
-
-  @Test
   def testApplicationContainerContext() {
     val orderVerifier = inOrder(this.applicationContainerContext, this.runLoop)
     this.samzaContainer.run
@@ -278,7 +268,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       this.producerMultiplexer,
       metrics,
       containerContext = this.containerContext,
-      applicationContainerContextOption = applicationContainerContext, containerStorageManager = null)
+      applicationContainerContextOption = applicationContainerContext, containerStorageManager = containerStorageManager)
     this.samzaContainer.setContainerListener(this.samzaContainerListener)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index d7c71fa..69bae4b 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -26,6 +26,7 @@ import org.apache.samza.container._
 import org.apache.samza.context.{ContainerContext, JobContext}
 import org.apache.samza.job.model.TaskModel
 import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.storage.ContainerStorageManager
 import org.apache.samza.system._
 import org.apache.samza.system.chooser.RoundRobinChooser
 import org.apache.samza.task.{StreamTask, TaskInstanceCollector}
@@ -68,7 +69,7 @@ object StreamProcessorTestUtils {
       metrics = new SamzaContainerMetrics,
       containerContext = containerContext,
       applicationContainerContextOption = None,
-      containerStorageManager = null)
+      containerStorageManager = Mockito.mock(classOf[ContainerStorageManager]))
     container
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index dba8678..5e71efc 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.container.TaskInstance;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.metrics.Gauge;
 import org.apache.samza.system.SystemConsumer;
 import org.junit.Assert;
 import org.junit.Before;
@@ -35,7 +36,7 @@ public class TestContainerStorageManager {
 
   private ContainerStorageManager containerStorageManager;
   private Map<String, SystemConsumer> systemConsumers;
-  private Map<TaskInstance, TaskStorageManager> taskStorageManagers;
+  private Map<TaskName, TaskStorageManager> taskStorageManagers;
   private SamzaContainerMetrics samzaContainerMetrics;
 
   private CountDownLatch taskStorageManagersRestoreStoreCount;
@@ -45,6 +46,8 @@ public class TestContainerStorageManager {
   private CountDownLatch systemConsumerStartCount;
   private CountDownLatch systemConsumerStopCount;
 
+  private Map<TaskName, Gauge<Object>> taskRestoreMetricGauges;
+
   /**
    * Utility method for creating a mocked taskInstance and taskStorageManager and adding it to the map.
    * @param taskname the desired taskname.
@@ -71,11 +74,15 @@ public class TestContainerStorageManager {
         return null;
       }).when(mockTaskStorageManager).restoreStores();
 
-    taskStorageManagers.put(mockTaskInstance, mockTaskStorageManager);
+    taskStorageManagers.put(new TaskName(taskname), mockTaskStorageManager);
+
+    Gauge testGauge = Mockito.mock(Gauge.class);
+    this.taskRestoreMetricGauges.put(new TaskName(taskname), testGauge);
   }
 
   @Before
   public void setUp() {
+    taskRestoreMetricGauges = new HashMap<>();
     systemConsumers = new HashMap<>();
     taskStorageManagers = new HashMap<>();
 
@@ -93,6 +100,7 @@ public class TestContainerStorageManager {
 
     // mock container metrics
     samzaContainerMetrics = Mockito.mock(SamzaContainerMetrics.class);
+    Mockito.when(samzaContainerMetrics.taskStoreRestorationMetrics()).thenReturn(taskRestoreMetricGauges);
 
     // mock and setup sysconsumers
     SystemConsumer mockSystemConsumer = Mockito.mock(SystemConsumer.class);
@@ -112,7 +120,7 @@ public class TestContainerStorageManager {
   }
 
   @Test
-  public void testParallelism() {
+  public void testParallelismAndMetrics() {
     this.containerStorageManager.start();
     this.containerStorageManager.shutdown();
     Assert.assertTrue("init count should be 0", this.taskStorageManagersInitCount.getCount() == 0);
@@ -121,5 +129,10 @@ public class TestContainerStorageManager {
 
     Assert.assertTrue("systemConsumerStopCount count should be 0", this.systemConsumerStopCount.getCount() == 0);
     Assert.assertTrue("systemConsumerStartCount count should be 0", this.systemConsumerStartCount.getCount() == 0);
+
+    for (Gauge gauge : taskRestoreMetricGauges.values()) {
+      Assert.assertTrue("Restoration time gauge value should be invoked atleast once", Mockito.mockingDetails(gauge).getInvocations().size() >= 1);
+    }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 0b945cb..ffdceca 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -106,6 +106,9 @@ class TestTaskStorageManager extends MockitoSugar {
 
     taskManager.init
 
+    // mocking restore (issued by ContainerStorageManager)
+    mockStorageEngine.restore(mock[util.Iterator[IncomingMessageEnvelope]])
+
     assertTrue(storeFile.exists())
     assertFalse(offsetFile.exists())
     verify(mockSystemConsumer).register(ssp, "0")