You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2022/10/15 10:12:50 UTC

[GitHub] [ozone] adoroszlai commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

adoroszlai commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r991481353


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java:
##########
@@ -63,6 +63,13 @@ void onMessage(EventHandler<PAYLOAD> handler,
    */
   long scheduledEvents();
 
+  /**
+   * Return the number of events scheduled to be processed.

Review Comment:
   Nit: comment duplicated from `scheduledEvents`, please update it to reflect the new method.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }

Review Comment:
   Question: Do we need to keep prior ICRs when FCR is received?  Would we lose any information by dropping them?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  private static void checkNotNull(Object v) {
+    if (v == null) {
+      throw new NullPointerException();
+    }
+  }
+
+  public boolean addValue(@NotNull VALUE value) {
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        return false;
+      }
+
+      if (isFCRReport(value)) {
+        return addContainerReport(value);
+      } else if (isICRReport(value)) {
+        return addIncrementalReport(value);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean add(@NotNull VALUE value) {
+    checkNotNull(value);
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        throw new IllegalStateException("capacity not available");
+      }
+
+      return addValue(value);
+    }
+  }
+
+  @Override
+  public boolean offer(@NotNull VALUE value) {
+    checkNotNull(value);
+    synchronized (this) {
+      return addValue(value);
+    }
+  }
+
+  @Override
+  public VALUE remove() {
+    synchronized (this) {
+      String uuid = orderingQueue.remove();
+      if (null == uuid) {
+        throw new NoSuchElementException();
+      }

Review Comment:
   `orderingQueue.remove()` already throws `NoSuchElementException` if the item is `null`, so this seems to be unnecessary.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java:
##########
@@ -46,17 +43,26 @@
  * @param <P> the payload type of events
  */
 @Metrics(context = "EventQueue")
-public class FixedThreadPoolWithAffinityExecutor<P>
+public class FixedThreadPoolWithAffinityExecutor<P, Q>
     implements EventExecutor<P> {
 
   private static final String EVENT_QUEUE = "EventQueue";
 
   private static final Logger LOG =
       LoggerFactory.getLogger(FixedThreadPoolWithAffinityExecutor.class);
 
+  private static final Map<String, FixedThreadPoolWithAffinityExecutor>
+      EXECUTOR_MAP = new ConcurrentHashMap<>();
+
   private final String name;
 
-  private final List<ThreadPoolExecutor> executors;

Review Comment:
   Any reason for making this non-`final`?  Please also make new fields `final` where possible.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java:
##########
@@ -518,6 +525,28 @@ private void initializeEventHandlers() {
 
   }
 
+  @NotNull
+  private List<BlockingQueue<ContainerReportBase>> initContainerReportQueue() {
+    int threadPoolSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".thread.pool.size",
+        OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+    int queueSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".queue.size",
+        OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);

Review Comment:
   Do we expect admins to use these new configs?  Can we just take sum or max of the individual pool/queue size configs?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;

Review Comment:
   We could avoid the need to cast by adding `getDatanodeDetails()` to the interface `ContainerReport(Base)`.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {

Review Comment:
   `ContainerReportQueue` is generic, but it is only instantiated with `ContainerReportBase` as `VALUE`.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }

Review Comment:
   This logic is the same for both FCR and ICR, can be shared.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();

Review Comment:
   Nit: unnecessary.
   
   ```suggestion
   ```



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }

Review Comment:
   Would be nice to add type (ICR/FCR) in `ContainerReport(Base)` interface to avoid the need for `instanceof` checks.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -260,6 +271,28 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
         reconTaskConfig));
   }
 
+  @NotNull
+  private List<BlockingQueue<ContainerReportBase>> initContainerReportQueue() {
+    int threadPoolSize = ozoneConfiguration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".thread.pool.size",
+        OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+    int queueSize = ozoneConfiguration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".queue.size",
+        OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);
+    List<BlockingQueue<ContainerReportBase>>
+        queues = new ArrayList<>();
+    for (int i = 0; i < threadPoolSize; ++i) {
+      queues.add(new ContainerReportQueue<>(queueSize));
+    }
+    return queues;
+  }
+

Review Comment:
   Seems like this is duplicated from `StorageContainerManager`.  Can you please reuse instead?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  private static void checkNotNull(Object v) {
+    if (v == null) {
+      throw new NullPointerException();
+    }
+  }
+
+  public boolean addValue(@NotNull VALUE value) {
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        return false;
+      }
+
+      if (isFCRReport(value)) {
+        return addContainerReport(value);
+      } else if (isICRReport(value)) {
+        return addIncrementalReport(value);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean add(@NotNull VALUE value) {
+    checkNotNull(value);
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        throw new IllegalStateException("capacity not available");
+      }
+
+      return addValue(value);
+    }
+  }
+
+  @Override
+  public boolean offer(@NotNull VALUE value) {
+    checkNotNull(value);
+    synchronized (this) {
+      return addValue(value);
+    }
+  }
+
+  @Override
+  public VALUE remove() {
+    synchronized (this) {
+      String uuid = orderingQueue.remove();
+      if (null == uuid) {
+        throw new NoSuchElementException();
+      }
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE poll() {
+    synchronized (this) {
+      String uuid = orderingQueue.poll();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE element() {
+    synchronized (this) {
+      String uuid = orderingQueue.element();
+      if (null == uuid) {
+        throw new NoSuchElementException();
+      }
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public VALUE peek() {
+    synchronized (this) {
+      String uuid = orderingQueue.peek();
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public void put(@NotNull VALUE value) throws InterruptedException {
+    checkNotNull(value);
+    while (!addValue(value)) {
+      Thread.currentThread().sleep(10);
+    }
+  }
+
+  @Override
+  public boolean offer(VALUE value, long timeout, @NotNull TimeUnit unit)
+      throws InterruptedException {
+    checkNotNull(value);
+    long timeoutMillis = unit.toMillis(timeout);
+    while (timeoutMillis > 0) {
+      if (addValue(value)) {
+        return true;
+      }
+      Thread.currentThread().sleep(10);
+      timeoutMillis -= 10;

Review Comment:
   Should it deduct actual time difference instead of assuming exactly 10 milliseconds has passed?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {

Review Comment:
   Please move `report.getDatanodeDetails()` and `dn.getUuidString()` outside of `synchronized` block (same in `addContainerReport`).  Also store `uuidString` and reuse below in 4 other places).



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java:
##########
@@ -265,11 +265,18 @@ public LayoutReportFromDatanode(DatanodeDetails datanodeDetails,
     }
   }
 
+  /**
+   * Container report payload base reference.
+   */
+  public interface ContainerReportBase {

Review Comment:
   Can you please simplify to `ContainerReport`?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  private static void checkNotNull(Object v) {
+    if (v == null) {
+      throw new NullPointerException();
+    }
+  }

Review Comment:
   Nit: why not use `Objects.requireNonNull` instead of defining own method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org