You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ri...@apache.org on 2022/10/03 04:47:12 UTC

[ozone] branch master updated: HDDS-7229. Introduce container location cache in ScmClient (#3771)

This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cdd0fb4ce1 HDDS-7229. Introduce container location cache in ScmClient (#3771)
cdd0fb4ce1 is described below

commit cdd0fb4ce1e7a21c66f9550457b05af0f1228f8b
Author: Duong Nguyen <du...@gmail.com>
AuthorDate: Sun Oct 2 21:47:05 2022 -0700

    HDDS-7229. Introduce container location cache in ScmClient (#3771)
---
 .../protocol/StorageContainerLocationProtocol.java |   2 +-
 .../common/src/main/resources/ozone-default.xml    |  21 +++
 ...inerLocationProtocolClientSideTranslatorPB.java |   2 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |   2 +-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  11 ++
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |   5 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |   4 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   3 +-
 .../java/org/apache/hadoop/ozone/om/ScmClient.java |  81 ++++++++-
 .../org/apache/hadoop/ozone/om/OmTestManagers.java |   2 +-
 .../org/apache/hadoop/ozone/om/TestScmClient.java  | 191 +++++++++++++++++++++
 11 files changed, 314 insertions(+), 10 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 51f2bb3f64..34bd2748f6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -116,7 +116,7 @@ public interface StorageContainerLocationProtocol extends Closeable {
    * @throws IOException
    */
   List<ContainerWithPipeline> getContainerWithPipelineBatch(
-      List<Long> containerIDs) throws IOException;
+      Iterable<? extends Long> containerIDs) throws IOException;
 
   /**
    * Ask SCM which containers of the given list exist.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 4ebef89f75..4d059d2d60 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3338,4 +3338,25 @@
       will create intermediate directories.
     </description>
   </property>
+
+  <property>
+    <name>ozone.om.container.location.cache.size</name>
+    <value>100000</value>
+    <tag>OZONE, OM</tag>
+    <description>
+      The size of the container locations cache in Ozone Manager. This cache allows Ozone Manager to populate
+      block locations in key-read responses without calling SCM, thus increases Ozone Manager read performance.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.container.location.cache.ttl</name>
+    <value>360m</value>
+    <tag>OZONE, OM</tag>
+    <description>
+      The time to live for container location cache in Ozone.
+    </description>
+  </property>
+
+
 </configuration>
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index b8e377def2..488d970cf2 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -286,7 +286,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
    */
   @Override
   public List<ContainerWithPipeline> getContainerWithPipelineBatch(
-      List<Long> containerIDs) throws IOException {
+      Iterable<? extends Long> containerIDs) throws IOException {
     for (Long containerID: containerIDs) {
       Preconditions.checkState(containerID >= 0,
           "Container ID cannot be negative");
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 5d6b357d65..4acb109e7d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -320,7 +320,7 @@ public class SCMClientProtocolServer implements
 
   @Override
   public List<ContainerWithPipeline> getContainerWithPipelineBatch(
-      List<Long> containerIDs) throws IOException {
+      Iterable<? extends Long> containerIDs) throws IOException {
     getScm().checkAdminAccess(null);
 
     List<ContainerWithPipeline> cpList = new ArrayList<>();
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 99d56d2b02..e72630a324 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -394,4 +394,15 @@ public final class OMConfigKeys {
   public static final TimeDuration
       OZONE_OM_MULTITENANCY_RANGER_SYNC_TIMEOUT_DEFAULT
       = TimeDuration.valueOf(10, TimeUnit.SECONDS);
+
+  public static final String OZONE_OM_CONTAINER_LOCATION_CACHE_SIZE
+      = "ozone.om.container.location.cache.size";
+  public static final int OZONE_OM_CONTAINER_LOCATION_CACHE_SIZE_DEFAULT
+      = 100_000;
+
+  public static final String OZONE_OM_CONTAINER_LOCATION_CACHE_TTL
+      = "ozone.om.container.location.cache.ttl";
+
+  public static final TimeDuration OZONE_OM_CONTAINER_LOCATION_CACHE_TTL_DEFAULT
+      = TimeDuration.valueOf(360, TimeUnit.MINUTES);
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 00136179f2..8c89f7c4e2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -253,14 +253,15 @@ public class TestKeyManagerImpl {
 
   private static void mockContainerClient() {
     ScmClient scmClient = new ScmClient(scm.getBlockProtocolServer(),
-        mockScmContainerClient);
+        mockScmContainerClient, conf);
     HddsWhiteboxTestUtils.setInternalState(keyManager,
         "scmClient", scmClient);
     HddsWhiteboxTestUtils.setInternalState(om,
         "scmClient", scmClient);
   }
   private static void mockBlockClient() {
-    ScmClient scmClient = new ScmClient(mockScmBlockLocationProtocol, null);
+    ScmClient scmClient = new ScmClient(mockScmBlockLocationProtocol, null,
+        conf);
     HddsWhiteboxTestUtils.setInternalState(keyManager,
         "scmClient", scmClient);
     HddsWhiteboxTestUtils.setInternalState(om,
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 6f7229c288..3de7053287 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -179,7 +179,7 @@ public class KeyManagerImpl implements KeyManager {
       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
       OzoneBlockTokenSecretManager secretManager,
       OMPerformanceMetrics metrics) {
-    this(null, new ScmClient(scmBlockClient, null), metadataManager,
+    this(null, new ScmClient(scmBlockClient, null, conf), metadataManager,
         conf, omId, secretManager, null, null, metrics);
   }
 
@@ -189,7 +189,7 @@ public class KeyManagerImpl implements KeyManager {
       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
       OzoneBlockTokenSecretManager secretManager,
       OMPerformanceMetrics metrics) {
-    this(null, new ScmClient(scmBlockClient, scmContainerClient),
+    this(null, new ScmClient(scmBlockClient, scmContainerClient, conf),
         metadataManager, conf, omId, secretManager, null, null,
         metrics);
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 2898fc82fb..388fc2910f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -530,7 +530,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     scmContainerClient = getScmContainerClient(configuration);
     // verifies that the SCM info in the OM Version file is correct.
     scmBlockClient = getScmBlockClient(configuration);
-    this.scmClient = new ScmClient(scmBlockClient, scmContainerClient);
+    this.scmClient = new ScmClient(scmBlockClient, scmContainerClient,
+        configuration);
     this.ozoneLockProvider = new OzoneLockProvider(getKeyPathLockEnabled(),
         getEnableFileSystemPaths());
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
index a3c2fd7794..dbc4ca9f4c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
@@ -17,9 +17,27 @@
 
 package org.apache.hadoop.ozone.om;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.update.client.SCMUpdateServiceGrpcClient;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_CACHE_SIZE;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_CACHE_TTL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_CACHE_TTL_DEFAULT;
 
 /**
  * Wrapper class for Scm protocol clients.
@@ -28,12 +46,49 @@ public class ScmClient {
 
   private final ScmBlockLocationProtocol blockClient;
   private final StorageContainerLocationProtocol containerClient;
+  private final LoadingCache<Long, Pipeline> containerLocationCache;
   private SCMUpdateServiceGrpcClient updateServiceGrpcClient;
 
   ScmClient(ScmBlockLocationProtocol blockClient,
-            StorageContainerLocationProtocol containerClient) {
+            StorageContainerLocationProtocol containerClient,
+            OzoneConfiguration configuration) {
     this.containerClient = containerClient;
     this.blockClient = blockClient;
+    this.containerLocationCache =
+        createContainerLocationCache(configuration, containerClient);
+  }
+
+  static LoadingCache<Long, Pipeline> createContainerLocationCache(
+      OzoneConfiguration configuration,
+      StorageContainerLocationProtocol containerClient) {
+    int maxSize = configuration.getInt(OZONE_OM_CONTAINER_LOCATION_CACHE_SIZE,
+        OZONE_OM_CONTAINER_LOCATION_CACHE_SIZE_DEFAULT);
+    TimeUnit unit =  OZONE_OM_CONTAINER_LOCATION_CACHE_TTL_DEFAULT.getUnit();
+    long ttl = configuration.getTimeDuration(
+        OZONE_OM_CONTAINER_LOCATION_CACHE_TTL,
+        OZONE_OM_CONTAINER_LOCATION_CACHE_TTL_DEFAULT.getDuration(), unit);
+    return CacheBuilder.newBuilder()
+        .maximumSize(maxSize)
+        .expireAfterWrite(ttl, unit)
+        .build(new CacheLoader<Long, Pipeline>() {
+          @NotNull
+          @Override
+          public Pipeline load(@NotNull Long key) throws Exception {
+            return containerClient.getContainerWithPipeline(key).getPipeline();
+          }
+
+          @NotNull
+          @Override
+          public Map<Long, Pipeline> loadAll(
+              @NotNull Iterable<? extends Long> keys) throws Exception {
+            return containerClient.getContainerWithPipelineBatch(keys)
+                .stream()
+                .collect(Collectors.toMap(
+                    x -> x.getContainerInfo().getContainerID(),
+                    ContainerWithPipeline::getPipeline
+                ));
+          }
+        });
   }
 
   public ScmBlockLocationProtocol getBlockClient() {
@@ -52,4 +107,28 @@ public class ScmClient {
   public SCMUpdateServiceGrpcClient getUpdateServiceGrpcClient() {
     return updateServiceGrpcClient;
   }
+
+  public Map<Long, Pipeline> getContainerLocations(Iterable<Long> containerIds,
+                                                  boolean forceRefresh)
+      throws IOException {
+    if (forceRefresh) {
+      containerLocationCache.invalidateAll(containerIds);
+    }
+    try {
+      return containerLocationCache.getAll(containerIds);
+    } catch (ExecutionException e) {
+      return handleCacheExecutionException(e);
+    }
+  }
+
+  private <T> T handleCacheExecutionException(ExecutionException e)
+      throws IOException {
+    if (e.getCause() instanceof IOException) {
+      throw (IOException) e.getCause();
+    }
+    throw new IllegalStateException("Unexpected exception accessing " +
+        "container location", e.getCause());
+  }
+
+
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
index 79de54efa3..17f575f43b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
@@ -101,7 +101,7 @@ public final class OmTestManagers {
 
     keyManager = (KeyManagerImpl) HddsWhiteboxTestUtils
         .getInternalState(om, "keyManager");
-    ScmClient scmClient = new ScmClient(scmBlockClient, containerClient);
+    ScmClient scmClient = new ScmClient(scmBlockClient, containerClient, conf);
     HddsWhiteboxTestUtils.setInternalState(om,
         "scmClient", scmClient);
     HddsWhiteboxTestUtils.setInternalState(keyManager,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestScmClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestScmClient.java
new file mode 100644
index 0000000000..061713c6e7
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestScmClient.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static java.util.Arrays.asList;
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.apache.hadoop.hdds.client.ReplicationConfig.fromTypeAndFactor;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * ScmClient test-cases.
+ */
+public class TestScmClient {
+  private ScmBlockLocationProtocol scmBlockLocationProtocol;
+  private StorageContainerLocationProtocol containerLocationProtocol;
+  private OzoneConfiguration conf;
+  private ScmClient scmClient;
+
+  @BeforeEach
+  public void setUp() {
+    scmBlockLocationProtocol = mock(ScmBlockLocationProtocol.class);
+    containerLocationProtocol = mock(StorageContainerLocationProtocol.class);
+    conf = new OzoneConfiguration();
+    scmClient = new ScmClient(scmBlockLocationProtocol,
+        containerLocationProtocol, conf);
+  }
+
+  private static Stream<Arguments> getContainerLocationsTestCases() {
+    return Stream.of(
+        Arguments.of("Existing keys",
+            newHashSet(1L, 2L, 3L), newHashSet(2L, 3L), false, newHashSet()),
+
+        Arguments.of("New keys",
+            newHashSet(1L, 2L), newHashSet(3L, 4L),
+            false, newHashSet(3L, 4L)),
+
+        Arguments.of("Partial new keys",
+            newHashSet(1L, 2L), newHashSet(1L, 3L, 4L),
+            false, newHashSet(3L, 4L)),
+
+        Arguments.of("Existing keys with force refresh",
+            newHashSet(1L, 2L, 3L), newHashSet(2L, 3L),
+            true, newHashSet(2L, 3L)),
+
+        Arguments.of("New keys with force refresh",
+            newHashSet(1L, 2L), newHashSet(3L, 4L),
+            true, newHashSet(3L, 4L)),
+
+        Arguments.of("Partial new keys with force refresh",
+            newHashSet(1L, 2L), newHashSet(1L, 3L, 4L),
+            true, newHashSet(1L, 3L, 4L))
+    );
+  }
+  @ParameterizedTest
+  @MethodSource("getContainerLocationsTestCases")
+  public void testGetContainerLocations(String testCaseName,
+                                       Set<Long> prepopulatedIds,
+                                       Set<Long> testContainerIds,
+                                       boolean forceRefresh,
+                                        Set<Long> expectedScmCallIds)
+      throws IOException {
+
+    Map<Long, ContainerWithPipeline> actualLocations = new HashMap<>();
+
+    for (long containerId : prepopulatedIds) {
+      ContainerWithPipeline pipeline = createPipeline(containerId);
+      actualLocations.put(containerId, pipeline);
+    }
+
+    // pre population of the cache.
+    when(containerLocationProtocol
+        .getContainerWithPipelineBatch(eq(prepopulatedIds)))
+        .thenReturn(new ArrayList<>(actualLocations.values()));
+    Map<Long, Pipeline> locations =
+        scmClient.getContainerLocations(prepopulatedIds, false);
+    locations.forEach((id, pipeline) -> {
+      Assertions.assertEquals(actualLocations.get(id).getPipeline(), pipeline);
+    });
+    verify(containerLocationProtocol, times(1))
+        .getContainerWithPipelineBatch(prepopulatedIds);
+
+    // consecutive call
+    if (!expectedScmCallIds.isEmpty()) {
+      List<ContainerWithPipeline> scmLocations = new ArrayList<>();
+      for (long containerId : expectedScmCallIds) {
+        ContainerWithPipeline pipeline = createPipeline(containerId);
+        scmLocations.add(pipeline);
+        actualLocations.put(containerId, pipeline);
+      }
+      when(containerLocationProtocol.getContainerWithPipelineBatch(
+          eq(expectedScmCallIds))).thenReturn(scmLocations);
+    }
+
+    locations = scmClient.getContainerLocations(testContainerIds, forceRefresh);
+    locations.forEach((id, pipeline) -> {
+      Assertions.assertEquals(actualLocations.get(id).getPipeline(), pipeline);
+    });
+
+    if (!expectedScmCallIds.isEmpty()) {
+      verify(containerLocationProtocol, times(1))
+          .getContainerWithPipelineBatch(expectedScmCallIds);
+    }
+  }
+
+  @Test
+  public void testGetContainerLocationsWithScmFailures() throws IOException {
+    IOException ioException = new IOException("Exception");
+    when(containerLocationProtocol
+        .getContainerWithPipelineBatch(newHashSet(1L)))
+        .thenThrow(ioException);
+    IOException actual = Assertions.assertThrows(IOException.class,
+        () -> scmClient.getContainerLocations(newHashSet(1L), false));
+    Assertions.assertEquals(ioException, actual);
+
+    RuntimeException runtimeException = new IllegalStateException("Test");
+    when(containerLocationProtocol
+        .getContainerWithPipelineBatch(newHashSet(2L)))
+        .thenThrow(runtimeException);
+    RuntimeException actualRt = Assertions.assertThrows(RuntimeException.class,
+        () -> scmClient.getContainerLocations(newHashSet(2L), false));
+    Assertions.assertEquals(runtimeException, actualRt.getCause());
+  }
+
+  ContainerWithPipeline createPipeline(long containerId) {
+    ContainerInfo containerInfo = new ContainerInfo.Builder()
+        .setContainerID(containerId)
+        .build();
+    Pipeline pipeline = Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setNodes(asList(randomDatanode(), randomDatanode()))
+        .setReplicationConfig(fromTypeAndFactor(
+            ReplicationType.RATIS, ReplicationFactor.THREE))
+        .setState(Pipeline.PipelineState.OPEN)
+        .build();
+    return new ContainerWithPipeline(containerInfo, pipeline);
+  }
+
+  private DatanodeDetails randomDatanode() {
+    return DatanodeDetails.newBuilder()
+        .setUuid(UUID.randomUUID())
+        .setHostName(randomAlphabetic(5))
+        .setIpAddress(randomAlphabetic(5))
+        .build();
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org