You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2022/09/27 09:01:29 UTC

[GitHub] [ozone] sumitagrawl opened a new pull request, #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

sumitagrawl opened a new pull request, #3783:
URL: https://github.com/apache/ozone/pull/3783

   ## What changes were proposed in this pull request?
   
   Customized Queue "ContainerReportQueue" in created to control the number of FCR for datanode to one only as latest report, and avoid old duplicate FCR report.
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/HDDS-7244
   
   ## How was this patch tested?
   
   1. ThreadPool changes are tested using Unit test
   2. ContainerReportQueue using unit test
   3. SCM and Recon processing E2E using Intellij and debugger to simulate slow processing.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r996471687


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java:
##########
@@ -518,6 +525,28 @@ private void initializeEventHandlers() {
 
   }
 
+  @NotNull
+  private List<BlockingQueue<ContainerReportBase>> initContainerReportQueue() {
+    int threadPoolSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".thread.pool.size",
+        OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+    int queueSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".queue.size",
+        OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);

Review Comment:
   @adoroszlai This is dependent on number of DNs which can not be determined during setup and can change.
   Below are related existing configuration available:
   1. Number of executor threads - thread.pool.size
   
   This configuration is be default provided 100,000 max value, this may not required to be changed; and can be handled by changing thread.pool.size.
   
   But as configuration, its exposed.
   
   @ChenSammi Please share your opinion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r983665624


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportQueue.class);
+
+  private static final Integer MAX_CAPACITY = 100000;
+
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR and ICR (filter out deleted Container Report)
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  @Override
+  public boolean add(@NotNull VALUE value) {
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        return false;
+      }
+
+      if (isFCRReport(value)) {
+        return addContainerReport(value);
+      } else if (isICRReport(value)) {
+        return addIncrementalReport(value);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean offer(@NotNull VALUE value) {
+    synchronized (this) {
+      return add(value);
+    }
+  }
+
+  @Override
+  public VALUE remove() {
+    synchronized (this) {
+      String uuid = orderingQueue.remove();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE poll() {
+    synchronized (this) {
+      String uuid = orderingQueue.poll();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE element() {
+    synchronized (this) {
+      String uuid = orderingQueue.element();
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public VALUE peek() {
+    synchronized (this) {
+      String uuid = orderingQueue.peek();
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public void put(@NotNull VALUE value) throws InterruptedException {
+    while (!add(value)) {
+      Thread.currentThread().sleep(10);
+    }
+  }
+
+  @Override
+  public boolean offer(VALUE value, long timeout, @NotNull TimeUnit unit)
+      throws InterruptedException {
+    long timeoutMillis = unit.toMillis(timeout);
+    while (timeoutMillis > 0) {
+      if (add(value)) {
+        return true;
+      }
+      Thread.currentThread().sleep(10);
+      timeoutMillis -= 10;
+    }
+    return false;
+  }
+
+  @NotNull
+  @Override
+  public VALUE take() throws InterruptedException {
+    String uuid = orderingQueue.take();
+    synchronized (this) {
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Nullable
+  @Override
+  public VALUE poll(long timeout, @NotNull TimeUnit unit)
+      throws InterruptedException {
+    String uuid = orderingQueue.poll(timeout, unit);

Review Comment:
   Same as above. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r983611244


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportQueue.class);
+
+  private static final Integer MAX_CAPACITY = 100000;
+
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();

Review Comment:
   Please add some comments about function of this orderingQueue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r996477783


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on PR #3783:
URL: https://github.com/apache/ozone/pull/3783#issuecomment-1283220746

   @adoroszlai plz review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r985354869


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportQueue.class);
+
+  private static final Integer MAX_CAPACITY = 100000;
+
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR and ICR (filter out deleted Container Report)
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  @Override
+  public boolean add(@NotNull VALUE value) {
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        return false;
+      }
+
+      if (isFCRReport(value)) {
+        return addContainerReport(value);
+      } else if (isICRReport(value)) {
+        return addIncrementalReport(value);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean offer(@NotNull VALUE value) {
+    synchronized (this) {
+      return add(value);
+    }
+  }
+
+  @Override
+  public VALUE remove() {
+    synchronized (this) {
+      String uuid = orderingQueue.remove();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE poll() {
+    synchronized (this) {
+      String uuid = orderingQueue.poll();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE element() {
+    synchronized (this) {
+      String uuid = orderingQueue.element();
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public VALUE peek() {
+    synchronized (this) {
+      String uuid = orderingQueue.peek();
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public void put(@NotNull VALUE value) throws InterruptedException {
+    while (!add(value)) {
+      Thread.currentThread().sleep(10);
+    }
+  }
+
+  @Override
+  public boolean offer(VALUE value, long timeout, @NotNull TimeUnit unit)
+      throws InterruptedException {
+    long timeoutMillis = unit.toMillis(timeout);
+    while (timeoutMillis > 0) {
+      if (add(value)) {
+        return true;
+      }
+      Thread.currentThread().sleep(10);
+      timeoutMillis -= 10;
+    }
+    return false;
+  }
+
+  @NotNull
+  @Override
+  public VALUE take() throws InterruptedException {
+    String uuid = orderingQueue.take();

Review Comment:
   We can not move under synchronized as, oderingQueue.take() is blocking. If its moved, it will cause other operation also wait infinitely, But we should allow those like add, capacity, .... So it is kept outside.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportQueue.class);
+
+  private static final Integer MAX_CAPACITY = 100000;
+
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR and ICR (filter out deleted Container Report)
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  @Override
+  public boolean add(@NotNull VALUE value) {
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        return false;
+      }
+
+      if (isFCRReport(value)) {
+        return addContainerReport(value);
+      } else if (isICRReport(value)) {
+        return addIncrementalReport(value);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean offer(@NotNull VALUE value) {
+    synchronized (this) {
+      return add(value);
+    }
+  }
+
+  @Override
+  public VALUE remove() {
+    synchronized (this) {
+      String uuid = orderingQueue.remove();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE poll() {
+    synchronized (this) {
+      String uuid = orderingQueue.poll();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE element() {
+    synchronized (this) {
+      String uuid = orderingQueue.element();
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public VALUE peek() {
+    synchronized (this) {
+      String uuid = orderingQueue.peek();
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public void put(@NotNull VALUE value) throws InterruptedException {
+    while (!add(value)) {
+      Thread.currentThread().sleep(10);
+    }
+  }
+
+  @Override
+  public boolean offer(VALUE value, long timeout, @NotNull TimeUnit unit)
+      throws InterruptedException {
+    long timeoutMillis = unit.toMillis(timeout);
+    while (timeoutMillis > 0) {
+      if (add(value)) {
+        return true;
+      }
+      Thread.currentThread().sleep(10);
+      timeoutMillis -= 10;
+    }
+    return false;
+  }
+
+  @NotNull
+  @Override
+  public VALUE take() throws InterruptedException {
+    String uuid = orderingQueue.take();
+    synchronized (this) {
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Nullable
+  @Override
+  public VALUE poll(long timeout, @NotNull TimeUnit unit)
+      throws InterruptedException {
+    String uuid = orderingQueue.poll(timeout, unit);

Review Comment:
   We can not move under synchronized as, oderingQueue.poll() is blocking for the time interval. If its moved, it will cause other operation also wait, But we should allow those like add, capacity, .... So it is kept outside.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r996477850


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  private static void checkNotNull(Object v) {
+    if (v == null) {
+      throw new NullPointerException();
+    }
+  }

Review Comment:
   removed checkNotNull and used Objects.requireNonNull



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  private static void checkNotNull(Object v) {
+    if (v == null) {
+      throw new NullPointerException();
+    }
+  }
+
+  public boolean addValue(@NotNull VALUE value) {
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        return false;
+      }
+
+      if (isFCRReport(value)) {
+        return addContainerReport(value);
+      } else if (isICRReport(value)) {
+        return addIncrementalReport(value);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean add(@NotNull VALUE value) {
+    checkNotNull(value);
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        throw new IllegalStateException("capacity not available");
+      }
+
+      return addValue(value);
+    }
+  }
+
+  @Override
+  public boolean offer(@NotNull VALUE value) {
+    checkNotNull(value);
+    synchronized (this) {
+      return addValue(value);
+    }
+  }
+
+  @Override
+  public VALUE remove() {
+    synchronized (this) {
+      String uuid = orderingQueue.remove();
+      if (null == uuid) {
+        throw new NoSuchElementException();
+      }

Review Comment:
   removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r983604468


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java:
##########
@@ -79,25 +90,39 @@
    * @param name Unique name used in monitoring and metrics.
    */
   public FixedThreadPoolWithAffinityExecutor(
-      String name,
-      List<ThreadPoolExecutor> executors) {
+      String name, EventHandler<P> eventHandler,
+      List<BlockingQueue<Q>> workQueues, EventPublisher eventPublisher,
+      Class<P> clazz, List<ThreadPoolExecutor> executors) {
     this.name = name;
+    this.eventHandler = eventHandler;
+    this.workQueues = workQueues;
+    this.eventPublisher = eventPublisher;
+    this.executors = executors;
+
+    EXECUTOR_MAP.put(clazz.getName(), this);
+
+    // Add runnable which will wait for task over another queue
+    // This needs terminate canceling each task in shutdown
+    int i = 0;
+    for (BlockingQueue<Q> queue : workQueues) {
+      ThreadPoolExecutor threadPoolExecutor = executors.get(i);
+      if (threadPoolExecutor.getQueue().size() == 0) {
+        threadPoolExecutor.submit(new ReportExecutor<>(queue, isRunning));
+      }
+      ++i;
+    }
+
     DefaultMetricsSystem.instance()

Review Comment:
   This metrics is not unregistered in close().  I know it's an existing issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on PR #3783:
URL: https://github.com/apache/ozone/pull/3783#issuecomment-1259217896

   @ChenSammi @nandakumar131 Plz review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r983236023


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportQueue.class);
+
+  private static final Integer MAX_CAPACITY = 100000;
+
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR and ICR (filter out deleted Container Report)

Review Comment:
   Here only remove FCR, no ICR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r999087035


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java:
##########
@@ -518,6 +525,28 @@ private void initializeEventHandlers() {
 
   }
 
+  @NotNull
+  private List<BlockingQueue<ContainerReportBase>> initContainerReportQueue() {
+    int threadPoolSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".thread.pool.size",
+        OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+    int queueSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".queue.size",
+        OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);

Review Comment:
   As discussed, 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi merged pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi merged PR #3783:
URL: https://github.com/apache/ozone/pull/3783


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r996477938


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  private static void checkNotNull(Object v) {
+    if (v == null) {
+      throw new NullPointerException();
+    }
+  }
+
+  public boolean addValue(@NotNull VALUE value) {
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        return false;
+      }
+
+      if (isFCRReport(value)) {
+        return addContainerReport(value);
+      } else if (isICRReport(value)) {
+        return addIncrementalReport(value);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean add(@NotNull VALUE value) {
+    checkNotNull(value);
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        throw new IllegalStateException("capacity not available");
+      }
+
+      return addValue(value);
+    }
+  }
+
+  @Override
+  public boolean offer(@NotNull VALUE value) {
+    checkNotNull(value);
+    synchronized (this) {
+      return addValue(value);
+    }
+  }
+
+  @Override
+  public VALUE remove() {
+    synchronized (this) {
+      String uuid = orderingQueue.remove();
+      if (null == uuid) {
+        throw new NoSuchElementException();
+      }
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE poll() {
+    synchronized (this) {
+      String uuid = orderingQueue.poll();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE element() {
+    synchronized (this) {
+      String uuid = orderingQueue.element();
+      if (null == uuid) {
+        throw new NoSuchElementException();
+      }
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public VALUE peek() {
+    synchronized (this) {
+      String uuid = orderingQueue.peek();
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public void put(@NotNull VALUE value) throws InterruptedException {
+    checkNotNull(value);
+    while (!addValue(value)) {
+      Thread.currentThread().sleep(10);
+    }
+  }
+
+  @Override
+  public boolean offer(VALUE value, long timeout, @NotNull TimeUnit unit)
+      throws InterruptedException {
+    checkNotNull(value);
+    long timeoutMillis = unit.toMillis(timeout);
+    while (timeoutMillis > 0) {
+      if (addValue(value)) {
+        return true;
+      }
+      Thread.currentThread().sleep(10);
+      timeoutMillis -= 10;

Review Comment:
   used startTime and end time to get actual difference and updated same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r996780015


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java:
##########
@@ -518,6 +525,28 @@ private void initializeEventHandlers() {
 
   }
 
+  @NotNull
+  private List<BlockingQueue<ContainerReportBase>> initContainerReportQueue() {
+    int threadPoolSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".thread.pool.size",
+        OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+    int queueSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".queue.size",
+        OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);

Review Comment:
   @sumitagrawl Let me clarify:
   
   Existing code uses two separate configs for the two executores (one for `ContainerReport`, one for `IncrementalContainerReport`).  With this patch, those configs are no longer used, and a third one is being introduced with a more complex name (`...ContainerReport_or_IncrementalContainerReport...`).
   
   I have two concerns:
   1. existing configs are ignored
   2. the new name exposes internal implementation (that now we have a single thread pool instead of two) that the users may not care about
   
   Neither of those are big deals, but maybe we could do better.
   
   One possible solution, using existing config:
    * Thread pool size could be the larger one of container report thread pool size and incremental container report thread pool size.
    * Queue size could be sum of container report queue size and incremental container report queue size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r996472833


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -260,6 +271,28 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
         reconTaskConfig));
   }
 
+  @NotNull
+  private List<BlockingQueue<ContainerReportBase>> initContainerReportQueue() {
+    int threadPoolSize = ozoneConfiguration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".thread.pool.size",
+        OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+    int queueSize = ozoneConfiguration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".queue.size",
+        OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);
+    List<BlockingQueue<ContainerReportBase>>
+        queues = new ArrayList<>();
+    for (int i = 0; i < threadPoolSize; ++i) {
+      queues.add(new ContainerReportQueue<>(queueSize));
+    }
+    return queues;
+  }
+

Review Comment:
   @adoroszlai I am not finding way to reuse, as ReconStorageContainerManagerFacade is copied from SCM with many modification and no shared utility to be used. I can expose same from ContainerReportQueue but does not seems good to have this.
   Please suggest mechanism if any or let be be like this for effective 6 loc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r983604468


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java:
##########
@@ -79,25 +90,39 @@
    * @param name Unique name used in monitoring and metrics.
    */
   public FixedThreadPoolWithAffinityExecutor(
-      String name,
-      List<ThreadPoolExecutor> executors) {
+      String name, EventHandler<P> eventHandler,
+      List<BlockingQueue<Q>> workQueues, EventPublisher eventPublisher,
+      Class<P> clazz, List<ThreadPoolExecutor> executors) {
     this.name = name;
+    this.eventHandler = eventHandler;
+    this.workQueues = workQueues;
+    this.eventPublisher = eventPublisher;
+    this.executors = executors;
+
+    EXECUTOR_MAP.put(clazz.getName(), this);
+
+    // Add runnable which will wait for task over another queue
+    // This needs terminate canceling each task in shutdown
+    int i = 0;
+    for (BlockingQueue<Q> queue : workQueues) {
+      ThreadPoolExecutor threadPoolExecutor = executors.get(i);
+      if (threadPoolExecutor.getQueue().size() == 0) {
+        threadPoolExecutor.submit(new ReportExecutor<>(queue, isRunning));
+      }
+      ++i;
+    }
+
     DefaultMetricsSystem.instance()

Review Comment:
   This metrics is not unregistered in close().  I know it's an existing code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r983229971


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportQueue.class);
+
+  private static final Integer MAX_CAPACITY = 100000;

Review Comment:
   Can we make this configurable? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r996473294


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }

Review Comment:
   @adoroszlai Yes, as discussed with Sammi,
   - If ICR contains Container delete, it will not be provided in FCR.
     For this currently, no mechanism to recover if misses this notification, which may be required to be designed.
   
   So its better not to remove ICR report and it will be small and quick.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on PR #3783:
URL: https://github.com/apache/ozone/pull/3783#issuecomment-1296717748

   The last patch LGTM + 1.  
   Thanks @sumitagrawl for the feature improvement, and @adoroszlai for the code review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r996688994


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }

Review Comment:
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r999087035


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java:
##########
@@ -518,6 +525,28 @@ private void initializeEventHandlers() {
 
   }
 
+  @NotNull
+  private List<BlockingQueue<ContainerReportBase>> initContainerReportQueue() {
+    int threadPoolSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".thread.pool.size",
+        OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+    int queueSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".queue.size",
+        OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);

Review Comment:
   As discussed, 
   1. ozone.scm.event.ContainerReport_or_IncrementalContainerReport.**thread.pool.size**  <-- existing one as earlier used in FixedThreadPoolWithAffinityExecutor::initializeExecutorPool (Refactored now to be used for queue creation)
   2. ozone.scm.event.ContainerReport_or_IncrementalContainerReport.**queue.size** <-- **added now to have bounded queue size** (Earlier its unbounded)
   
   One of Thread Pool --> Have One Container Queue --> can handle multiple DNs report (including both FCR / ICR) 
   One DN associated to only one Thread Pool (& its queue), as need execute FCR/ICR in order of receiving from DN.
   So we use both config, Thread pool size is useful for parallelism of multiple DNs
           Queue Size -- max set of FCR/ICR for one or More DN to the executor pool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r996477755


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;

Review Comment:
   added method in ContainerReport



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r996477702


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java:
##########
@@ -265,11 +265,18 @@ public LayoutReportFromDatanode(DatanodeDetails datanodeDetails,
     }
   }
 
+  /**
+   * Container report payload base reference.
+   */
+  public interface ContainerReportBase {

Review Comment:
   update class name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r996477601


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java:
##########
@@ -63,6 +63,13 @@ void onMessage(EventHandler<PAYLOAD> handler,
    */
   long scheduledEvents();
 
+  /**
+   * Return the number of events scheduled to be processed.

Review Comment:
   updated comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r985412043


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportQueue.class);
+
+  private static final Integer MAX_CAPACITY = 100000;
+
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR and ICR (filter out deleted Container Report)

Review Comment:
   updated comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r997148977


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -260,6 +271,28 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
         reconTaskConfig));
   }
 
+  @NotNull
+  private List<BlockingQueue<ContainerReportBase>> initContainerReportQueue() {
+    int threadPoolSize = ozoneConfiguration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".thread.pool.size",
+        OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+    int queueSize = ozoneConfiguration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".queue.size",
+        OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);
+    List<BlockingQueue<ContainerReportBase>>
+        queues = new ArrayList<>();
+    for (int i = 0; i < threadPoolSize; ++i) {
+      queues.add(new ContainerReportQueue<>(queueSize));
+    }
+    return queues;
+  }
+

Review Comment:
   Fixed my moving this to ScmUtils to avoid duplicate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on PR #3783:
URL: https://github.com/apache/ozone/pull/3783#issuecomment-1272793729

   Hi @kerneltime , would you like to take a look of this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r984185738


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java:
##########
@@ -154,15 +175,77 @@ public long scheduledEvents() {
     return scheduled.value();
   }
 
+  @Override
+  public long droppedEvents() {
+    return dropped.value();
+  }
+
   @Override
   public void close() {
+    isRunning.set(false);
     for (ThreadPoolExecutor executor : executors) {
       executor.shutdown();
     }
+    EXECUTOR_MAP.clear();
   }
 
   @Override
   public String getName() {
     return name;
   }
+
+  /**
+   * Runnable class to perform execution of payload.
+   */
+  public static class ReportExecutor<P> implements Runnable {
+    private BlockingQueue<P> queue;
+    private AtomicBoolean isRunning;
+
+    public ReportExecutor(BlockingQueue<P> queue, AtomicBoolean isRunning) {
+      this.queue = queue;
+      this.isRunning = isRunning;
+    }
+
+    @Override
+    public void run() {
+      while (isRunning.get()) {
+        try {
+          Object report = queue.poll(1, TimeUnit.MILLISECONDS);
+          if (report == null) {
+            continue;
+          }
+
+          FixedThreadPoolWithAffinityExecutor executor = EXECUTOR_MAP.get(
+              report.getClass().getName());
+          if (null == executor) {
+            continue;
+          }
+
+          executor.scheduled.incr();
+          try {
+            executor.eventHandler.onMessage(report,
+                executor.eventPublisher);
+            executor.done.incr();
+          } catch (Exception ex) {
+            LOG.error("Error on execution message {}", report, ex);
+            executor.failed.incr();
+          }
+          if (Thread.currentThread().isInterrupted()) {
+            LOG.warn("Interrupt of execution of Reports");
+            return;
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupt of execution of Reports");
+          return;

Review Comment:
    Add "Thread.currentThread().interrupt();" statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r983607935


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java:
##########
@@ -154,15 +175,77 @@ public long scheduledEvents() {
     return scheduled.value();
   }
 
+  @Override
+  public long droppedEvents() {
+    return dropped.value();
+  }
+
   @Override
   public void close() {
+    isRunning.set(false);
     for (ThreadPoolExecutor executor : executors) {
       executor.shutdown();
     }
+    EXECUTOR_MAP.clear();
   }
 
   @Override
   public String getName() {
     return name;
   }
+
+  /**
+   * Runnable class to perform execution of payload.
+   */
+  public static class ReportExecutor<P> implements Runnable {
+    private BlockingQueue<P> queue;
+    private AtomicBoolean isRunning;
+
+    public ReportExecutor(BlockingQueue<P> queue, AtomicBoolean isRunning) {
+      this.queue = queue;
+      this.isRunning = isRunning;
+    }
+
+    @Override
+    public void run() {
+      while (isRunning.get()) {
+        try {
+          Object report = queue.poll(1, TimeUnit.MILLISECONDS);
+          if (report == null) {
+            continue;
+          }
+
+          FixedThreadPoolWithAffinityExecutor executor = EXECUTOR_MAP.get(
+              report.getClass().getName());
+          if (null == executor) {
+            continue;

Review Comment:
   Add a warning level log here if executor cannot be found. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r983627603


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportQueue.class);
+
+  private static final Integer MAX_CAPACITY = 100000;
+
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR and ICR (filter out deleted Container Report)
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  @Override
+  public boolean add(@NotNull VALUE value) {
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        return false;
+      }
+
+      if (isFCRReport(value)) {
+        return addContainerReport(value);
+      } else if (isICRReport(value)) {
+        return addIncrementalReport(value);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean offer(@NotNull VALUE value) {
+    synchronized (this) {
+      return add(value);
+    }
+  }
+
+  @Override
+  public VALUE remove() {
+    synchronized (this) {
+      String uuid = orderingQueue.remove();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE poll() {
+    synchronized (this) {
+      String uuid = orderingQueue.poll();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE element() {

Review Comment:
   It should throw NoSuchElementException for element, remove if queue is empty.  Please double check if there is other API which should throw exception too. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r983607935


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java:
##########
@@ -154,15 +175,77 @@ public long scheduledEvents() {
     return scheduled.value();
   }
 
+  @Override
+  public long droppedEvents() {
+    return dropped.value();
+  }
+
   @Override
   public void close() {
+    isRunning.set(false);
     for (ThreadPoolExecutor executor : executors) {
       executor.shutdown();
     }
+    EXECUTOR_MAP.clear();
   }
 
   @Override
   public String getName() {
     return name;
   }
+
+  /**
+   * Runnable class to perform execution of payload.
+   */
+  public static class ReportExecutor<P> implements Runnable {
+    private BlockingQueue<P> queue;
+    private AtomicBoolean isRunning;
+
+    public ReportExecutor(BlockingQueue<P> queue, AtomicBoolean isRunning) {
+      this.queue = queue;
+      this.isRunning = isRunning;
+    }
+
+    @Override
+    public void run() {
+      while (isRunning.get()) {
+        try {
+          Object report = queue.poll(1, TimeUnit.MILLISECONDS);
+          if (report == null) {
+            continue;
+          }
+
+          FixedThreadPoolWithAffinityExecutor executor = EXECUTOR_MAP.get(
+              report.getClass().getName());
+          if (null == executor) {
+            continue;

Review Comment:
   Add a warning level log if executor cannot be found. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r983607245


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java:
##########
@@ -154,15 +175,77 @@ public long scheduledEvents() {
     return scheduled.value();
   }
 
+  @Override
+  public long droppedEvents() {
+    return dropped.value();
+  }
+
   @Override
   public void close() {
+    isRunning.set(false);
     for (ThreadPoolExecutor executor : executors) {
       executor.shutdown();
     }
+    EXECUTOR_MAP.clear();
   }
 
   @Override
   public String getName() {
     return name;
   }
+
+  /**
+   * Runnable class to perform execution of payload.
+   */
+  public static class ReportExecutor<P> implements Runnable {

Review Comment:
   Actually it's a task. Can we rename it to something like "ContainerReportProcessTask"? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r983665031


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportQueue.class);
+
+  private static final Integer MAX_CAPACITY = 100000;
+
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR and ICR (filter out deleted Container Report)
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  @Override
+  public boolean add(@NotNull VALUE value) {
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        return false;
+      }
+
+      if (isFCRReport(value)) {
+        return addContainerReport(value);
+      } else if (isICRReport(value)) {
+        return addIncrementalReport(value);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean offer(@NotNull VALUE value) {
+    synchronized (this) {
+      return add(value);
+    }
+  }
+
+  @Override
+  public VALUE remove() {
+    synchronized (this) {
+      String uuid = orderingQueue.remove();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE poll() {
+    synchronized (this) {
+      String uuid = orderingQueue.poll();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE element() {
+    synchronized (this) {
+      String uuid = orderingQueue.element();
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public VALUE peek() {
+    synchronized (this) {
+      String uuid = orderingQueue.peek();
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public void put(@NotNull VALUE value) throws InterruptedException {
+    while (!add(value)) {
+      Thread.currentThread().sleep(10);
+    }
+  }
+
+  @Override
+  public boolean offer(VALUE value, long timeout, @NotNull TimeUnit unit)
+      throws InterruptedException {
+    long timeoutMillis = unit.toMillis(timeout);
+    while (timeoutMillis > 0) {
+      if (add(value)) {
+        return true;
+      }
+      Thread.currentThread().sleep(10);
+      timeoutMillis -= 10;
+    }
+    return false;
+  }
+
+  @NotNull
+  @Override
+  public VALUE take() throws InterruptedException {
+    String uuid = orderingQueue.take();

Review Comment:
   Shall we move this statement to under the synchronized too? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r983604468


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java:
##########
@@ -79,25 +90,39 @@
    * @param name Unique name used in monitoring and metrics.
    */
   public FixedThreadPoolWithAffinityExecutor(
-      String name,
-      List<ThreadPoolExecutor> executors) {
+      String name, EventHandler<P> eventHandler,
+      List<BlockingQueue<Q>> workQueues, EventPublisher eventPublisher,
+      Class<P> clazz, List<ThreadPoolExecutor> executors) {
     this.name = name;
+    this.eventHandler = eventHandler;
+    this.workQueues = workQueues;
+    this.eventPublisher = eventPublisher;
+    this.executors = executors;
+
+    EXECUTOR_MAP.put(clazz.getName(), this);
+
+    // Add runnable which will wait for task over another queue
+    // This needs terminate canceling each task in shutdown
+    int i = 0;
+    for (BlockingQueue<Q> queue : workQueues) {
+      ThreadPoolExecutor threadPoolExecutor = executors.get(i);
+      if (threadPoolExecutor.getQueue().size() == 0) {
+        threadPoolExecutor.submit(new ReportExecutor<>(queue, isRunning));
+      }
+      ++i;
+    }
+
     DefaultMetricsSystem.instance()

Review Comment:
   This metrics is not unregistered in close().  It's an existing issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r991481353


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java:
##########
@@ -63,6 +63,13 @@ void onMessage(EventHandler<PAYLOAD> handler,
    */
   long scheduledEvents();
 
+  /**
+   * Return the number of events scheduled to be processed.

Review Comment:
   Nit: comment duplicated from `scheduledEvents`, please update it to reflect the new method.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }

Review Comment:
   Question: Do we need to keep prior ICRs when FCR is received?  Would we lose any information by dropping them?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  private static void checkNotNull(Object v) {
+    if (v == null) {
+      throw new NullPointerException();
+    }
+  }
+
+  public boolean addValue(@NotNull VALUE value) {
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        return false;
+      }
+
+      if (isFCRReport(value)) {
+        return addContainerReport(value);
+      } else if (isICRReport(value)) {
+        return addIncrementalReport(value);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean add(@NotNull VALUE value) {
+    checkNotNull(value);
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        throw new IllegalStateException("capacity not available");
+      }
+
+      return addValue(value);
+    }
+  }
+
+  @Override
+  public boolean offer(@NotNull VALUE value) {
+    checkNotNull(value);
+    synchronized (this) {
+      return addValue(value);
+    }
+  }
+
+  @Override
+  public VALUE remove() {
+    synchronized (this) {
+      String uuid = orderingQueue.remove();
+      if (null == uuid) {
+        throw new NoSuchElementException();
+      }

Review Comment:
   `orderingQueue.remove()` already throws `NoSuchElementException` if the item is `null`, so this seems to be unnecessary.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java:
##########
@@ -46,17 +43,26 @@
  * @param <P> the payload type of events
  */
 @Metrics(context = "EventQueue")
-public class FixedThreadPoolWithAffinityExecutor<P>
+public class FixedThreadPoolWithAffinityExecutor<P, Q>
     implements EventExecutor<P> {
 
   private static final String EVENT_QUEUE = "EventQueue";
 
   private static final Logger LOG =
       LoggerFactory.getLogger(FixedThreadPoolWithAffinityExecutor.class);
 
+  private static final Map<String, FixedThreadPoolWithAffinityExecutor>
+      EXECUTOR_MAP = new ConcurrentHashMap<>();
+
   private final String name;
 
-  private final List<ThreadPoolExecutor> executors;

Review Comment:
   Any reason for making this non-`final`?  Please also make new fields `final` where possible.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java:
##########
@@ -518,6 +525,28 @@ private void initializeEventHandlers() {
 
   }
 
+  @NotNull
+  private List<BlockingQueue<ContainerReportBase>> initContainerReportQueue() {
+    int threadPoolSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".thread.pool.size",
+        OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+    int queueSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".queue.size",
+        OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);

Review Comment:
   Do we expect admins to use these new configs?  Can we just take sum or max of the individual pool/queue size configs?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;

Review Comment:
   We could avoid the need to cast by adding `getDatanodeDetails()` to the interface `ContainerReport(Base)`.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {

Review Comment:
   `ContainerReportQueue` is generic, but it is only instantiated with `ContainerReportBase` as `VALUE`.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }

Review Comment:
   This logic is the same for both FCR and ICR, can be shared.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();

Review Comment:
   Nit: unnecessary.
   
   ```suggestion
   ```



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }

Review Comment:
   Would be nice to add type (ICR/FCR) in `ContainerReport(Base)` interface to avoid the need for `instanceof` checks.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -260,6 +271,28 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
         reconTaskConfig));
   }
 
+  @NotNull
+  private List<BlockingQueue<ContainerReportBase>> initContainerReportQueue() {
+    int threadPoolSize = ozoneConfiguration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".thread.pool.size",
+        OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+    int queueSize = ozoneConfiguration.getInt(OZONE_SCM_EVENT_PREFIX +
+            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+            + ".queue.size",
+        OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);
+    List<BlockingQueue<ContainerReportBase>>
+        queues = new ArrayList<>();
+    for (int i = 0; i < threadPoolSize; ++i) {
+      queues.add(new ContainerReportQueue<>(queueSize));
+    }
+    return queues;
+  }
+

Review Comment:
   Seems like this is duplicated from `StorageContainerManager`.  Can you please reuse instead?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  private static void checkNotNull(Object v) {
+    if (v == null) {
+      throw new NullPointerException();
+    }
+  }
+
+  public boolean addValue(@NotNull VALUE value) {
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        return false;
+      }
+
+      if (isFCRReport(value)) {
+        return addContainerReport(value);
+      } else if (isICRReport(value)) {
+        return addIncrementalReport(value);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean add(@NotNull VALUE value) {
+    checkNotNull(value);
+    synchronized (this) {
+      if (remainingCapacity() == 0) {
+        throw new IllegalStateException("capacity not available");
+      }
+
+      return addValue(value);
+    }
+  }
+
+  @Override
+  public boolean offer(@NotNull VALUE value) {
+    checkNotNull(value);
+    synchronized (this) {
+      return addValue(value);
+    }
+  }
+
+  @Override
+  public VALUE remove() {
+    synchronized (this) {
+      String uuid = orderingQueue.remove();
+      if (null == uuid) {
+        throw new NoSuchElementException();
+      }
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE poll() {
+    synchronized (this) {
+      String uuid = orderingQueue.poll();
+      return removeAndGet(uuid);
+    }
+  }
+
+  @Override
+  public VALUE element() {
+    synchronized (this) {
+      String uuid = orderingQueue.element();
+      if (null == uuid) {
+        throw new NoSuchElementException();
+      }
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public VALUE peek() {
+    synchronized (this) {
+      String uuid = orderingQueue.peek();
+      return getReport(uuid);
+    }
+  }
+
+  @Override
+  public void put(@NotNull VALUE value) throws InterruptedException {
+    checkNotNull(value);
+    while (!addValue(value)) {
+      Thread.currentThread().sleep(10);
+    }
+  }
+
+  @Override
+  public boolean offer(VALUE value, long timeout, @NotNull TimeUnit unit)
+      throws InterruptedException {
+    checkNotNull(value);
+    long timeoutMillis = unit.toMillis(timeout);
+    while (timeoutMillis > 0) {
+      if (addValue(value)) {
+        return true;
+      }
+      Thread.currentThread().sleep(10);
+      timeoutMillis -= 10;

Review Comment:
   Should it deduct actual time difference instead of assuming exactly 10 milliseconds has passed?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {

Review Comment:
   Please move `report.getDatanodeDetails()` and `dn.getUuidString()` outside of `synchronized` block (same in `addContainerReport`).  Also store `uuidString` and reuse below in 4 other places).



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java:
##########
@@ -265,11 +265,18 @@ public LayoutReportFromDatanode(DatanodeDetails datanodeDetails,
     }
   }
 
+  /**
+   * Container report payload base reference.
+   */
+  public interface ContainerReportBase {

Review Comment:
   Can you please simplify to `ContainerReport`?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportBase;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor.IQueueMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Customized queue to handle FCR and ICR from datanode optimally,
+ * avoiding duplicate FCR reports.
+ */
+public class ContainerReportQueue<VALUE extends ContainerReportBase>
+    implements BlockingQueue<VALUE>, IQueueMetrics {
+
+  private final Integer maxCapacity;
+
+  /* ordering queue provides ordering of execution in fair manner
+   * i.e. report execution from multiple datanode will be executed in same
+   * order as added to queue.
+   */
+  private LinkedBlockingQueue<String> orderingQueue
+      = new LinkedBlockingQueue<>();
+  private Map<String, List<VALUE>> dataMap = new HashMap<>();
+
+  private int capacity = 0;
+
+  private AtomicInteger droppedCount = new AtomicInteger();
+
+  public ContainerReportQueue() {
+    this(100000);
+  }
+  
+  public ContainerReportQueue(int maxCapacity) {
+    super();
+    this.maxCapacity = maxCapacity;
+  }
+
+  private boolean addContainerReport(VALUE val) {
+    ContainerReportFromDatanode report
+        = (ContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. FCR report available
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      boolean isReportRemoved = false;
+      if (!dataList.isEmpty()) {
+        // remove FCR if present
+        for (int i = dataList.size() - 1; i >= 0; --i) {
+          ContainerReportBase reportInfo = dataList.get(i);
+          // if FCR, its last FCR report, remove directly
+          if (isFCRReport(reportInfo)) {
+            dataList.remove(i);
+            --capacity;
+            droppedCount.incrementAndGet();
+            isReportRemoved = true;
+            break;
+          }
+        }
+      }
+
+      dataList.add(val);
+      ++capacity;
+      if (!isReportRemoved) {
+        orderingQueue.add(dn.getUuidString());
+      }
+    }
+    return true;
+  }
+
+  private boolean addIncrementalReport(VALUE val) {
+    IncrementalContainerReportFromDatanode report
+        = (IncrementalContainerReportFromDatanode) val;
+    synchronized (this) {
+      // 1. check if no previous report available, else add the report
+      DatanodeDetails dn = report.getDatanodeDetails();
+      if (!dataMap.containsKey(dn.getUuidString())) {
+        ArrayList<VALUE> dataList = new ArrayList<>();
+        dataList.add(val);
+        ++capacity;
+        dataMap.put(dn.getUuidString(), dataList);
+        orderingQueue.add(dn.getUuidString());
+        return true;
+      }
+
+      // 2. Add ICR report or merge to previous ICR
+      List<VALUE> dataList = dataMap.get(dn.getUuidString());
+      dataList.add(val);
+      ++capacity;
+      orderingQueue.add(dn.getUuidString());
+    }
+    return true;
+  }
+
+  private boolean isFCRReport(Object report) {
+    return report instanceof ContainerReportFromDatanode;
+  }
+
+  private boolean isICRReport(Object report) {
+    return report instanceof IncrementalContainerReportFromDatanode;
+  }
+
+  private VALUE removeAndGet(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    VALUE report = null;
+    if (dataList != null && !dataList.isEmpty()) {
+      report = dataList.remove(0);
+      --capacity;
+      if (dataList.isEmpty()) {
+        dataMap.remove(uuid);
+      }
+    }
+    return report;
+  }
+
+  private VALUE getReport(String uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    List<VALUE> dataList = dataMap.get(uuid);
+    if (dataList != null && !dataList.isEmpty()) {
+      return dataList.get(0);
+    }
+    return null;
+  }
+
+  private static void checkNotNull(Object v) {
+    if (v == null) {
+      throw new NullPointerException();
+    }
+  }

Review Comment:
   Nit: why not use `Objects.requireNonNull` instead of defining own method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on code in PR #3783:
URL: https://github.com/apache/ozone/pull/3783#discussion_r996477635


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java:
##########
@@ -46,17 +43,26 @@
  * @param <P> the payload type of events
  */
 @Metrics(context = "EventQueue")
-public class FixedThreadPoolWithAffinityExecutor<P>
+public class FixedThreadPoolWithAffinityExecutor<P, Q>
     implements EventExecutor<P> {
 
   private static final String EVENT_QUEUE = "EventQueue";
 
   private static final Logger LOG =
       LoggerFactory.getLogger(FixedThreadPoolWithAffinityExecutor.class);
 
+  private static final Map<String, FixedThreadPoolWithAffinityExecutor>
+      EXECUTOR_MAP = new ConcurrentHashMap<>();
+
   private final String name;
 
-  private final List<ThreadPoolExecutor> executors;

Review Comment:
   added final for all new fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on pull request #3783: HDDS-7244. Fix multiple reports queued up from same DN and using up heap

Posted by GitBox <gi...@apache.org>.
sumitagrawl commented on PR #3783:
URL: https://github.com/apache/ozone/pull/3783#issuecomment-1290270482

   @kerneltime Please review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org