You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/03/06 06:56:56 UTC

[linkis] branch dev-1.3.2 updated: Support ecm stimate actual memory (#4306)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new 14ece3e10 Support ecm stimate actual memory (#4306)
14ece3e10 is described below

commit 14ece3e1030948d0358a9589244442b7cd82fbe9
Author: Casion <ca...@gmail.com>
AuthorDate: Mon Mar 6 14:56:49 2023 +0800

    Support ecm stimate actual memory (#4306)
    
    * ecm infer max memory and core
---
 .../linkis-engineconn-manager-server.md            |  1 +
 .../apache/linkis/common/utils/HardwareUtils.scala |  5 +++
 .../linkis/common/utils/HardwareUtilsTest.scala    |  5 +++
 .../linkis/ecm/server/conf/ECMConfiguration.scala  |  3 ++
 .../service/impl/DefaultECMHealthService.scala     | 12 +++----
 .../service/impl/DefaultECMRegisterService.scala   |  3 +-
 .../apache/linkis/ecm/server/util/ECMUtils.scala   | 41 +++++++++++++++++++++-
 7 files changed, 60 insertions(+), 10 deletions(-)

diff --git a/docs/configuration/linkis-engineconn-manager-server.md b/docs/configuration/linkis-engineconn-manager-server.md
index 825ea49c5..fff0465b8 100644
--- a/docs/configuration/linkis-engineconn-manager-server.md
+++ b/docs/configuration/linkis-engineconn-manager-server.md
@@ -8,6 +8,7 @@
 |linkis-engineconn-manager-server|wds.linkis.ecm.async.bus.consumer.size|30|bus.consumer.size|
 |linkis-engineconn-manager-server|wds.linkis.ecm.async.bus.max.free.time| 2m|bus.max.free.time|
 |linkis-engineconn-manager-server|wds.linkis.ecm.async.bus.waittoempty.time|5000L |bus.waittoempty.time|
+|linkis-engineconn-manager-server|linkis.ecm.stimate.actual.memory.enable|false |stimate actual memory enable|
 |linkis-engineconn-manager-server|wds.linkis.ecm.memory.max| 100g|ecm.memory.max |
 |linkis-engineconn-manager-server|wds.linkis.ecm.cores.max|100| ecm.cores.max |
 |linkis-engineconn-manager-server|wds.linkis.ecm.engineconn.instances.max| 50 |engineconn.instances.max|
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/HardwareUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/HardwareUtils.scala
index f87a6f94c..f9c5c2a08 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/HardwareUtils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/HardwareUtils.scala
@@ -40,6 +40,11 @@ object HardwareUtils {
     globalMemory.getTotal
   }
 
+  def getMaxLogicalCore(): Int = {
+    val globalProcessor = hardware.getProcessor
+    globalProcessor.getLogicalProcessorCount
+  }
+
   /**
    * 1 total 2 available
    *
diff --git a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/HardwareUtilsTest.scala b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/HardwareUtilsTest.scala
index f6ba4de6f..4196c4228 100644
--- a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/HardwareUtilsTest.scala
+++ b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/HardwareUtilsTest.scala
@@ -31,6 +31,11 @@ class HardwareUtilsTest {
     assert(maxMemory > 1000L)
   }
 
+  @Test private[utils] def testGetMaxLogicalCore() = {
+    val maxCore = HardwareUtils.getMaxLogicalCore()
+    assert(maxCore >= 1)
+  }
+
   @Test private[utils] def testGetTotalAndAvailableMemory() = {
     val (maxMemory, availableMemory) = HardwareUtils.getTotalAndAvailableMemory()
     assert(maxMemory >= availableMemory)
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala
index b6acc7b06..0c48d730a 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala
@@ -46,6 +46,9 @@ object ECMConfiguration {
     CommonVars("wds.linkis.ecm.async.bus.waittoempty.time", 5000L).getValue
 
   // resource
+  val ECM_STIMATE_ACTUAL_MEMORY_ENABLE: Boolean =
+    CommonVars[Boolean]("linkis.ecm.stimate.actual.memory.enable", false).getValue
+
   val ECM_MAX_MEMORY_AVAILABLE: Long =
     CommonVars[Long]("wds.linkis.ecm.memory.max", ByteTimeUtils.byteStringAsBytes("100g")).getValue
 
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMHealthService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMHealthService.scala
index f6c9c4cb4..132749cbe 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMHealthService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMHealthService.scala
@@ -26,6 +26,7 @@ import org.apache.linkis.ecm.server.conf.ECMConfiguration._
 import org.apache.linkis.ecm.server.listener.{ECMClosedEvent, ECMReadyEvent}
 import org.apache.linkis.ecm.server.report.DefaultECMHealthReport
 import org.apache.linkis.ecm.server.service.{ECMHealthService, EngineConnListService}
+import org.apache.linkis.ecm.server.util.ECMUtils
 import org.apache.linkis.manager.common.entity.enumeration.{NodeHealthy, NodeStatus}
 import org.apache.linkis.manager.common.entity.metrics.{NodeHealthyInfo, NodeOverLoadInfo}
 import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, LoadInstanceResource}
@@ -44,14 +45,9 @@ import java.util.concurrent.TimeUnit
 
 class DefaultECMHealthService extends ECMHealthService with ECMEventListener {
 
-  private val maxResource = new LoadInstanceResource(
-    ECM_MAX_MEMORY_AVAILABLE,
-    ECM_MAX_CORES_AVAILABLE,
-    ECM_MAX_CREATE_INSTANCES
-  )
+  private val maxResource = ECMUtils.initMaxResource
 
-  private val minResource =
-    new LoadInstanceResource(ECM_PROTECTED_MEMORY, ECM_PROTECTED_CORES, ECM_PROTECTED_INSTANCES)
+  private val minResource = ECMUtils.initMinResource
 
   private var status: NodeStatus = NodeStatus.Starting
 
@@ -61,7 +57,7 @@ class DefaultECMHealthService extends ECMHealthService with ECMEventListener {
 
   private var lastCpuLoad: Double = 0d
 
-  private var lastFreeMemory: Long = ECM_MAX_MEMORY_AVAILABLE
+  private var lastFreeMemory: Long = ECMUtils.inferDefaultMemory()
 
   private val statusLocker = new Object()
 
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
index 0732ce1b5..eb9206c96 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
@@ -22,6 +22,7 @@ import org.apache.linkis.ecm.core.listener.{ECMEvent, ECMEventListener}
 import org.apache.linkis.ecm.server.conf.ECMConfiguration._
 import org.apache.linkis.ecm.server.listener.{ECMClosedEvent, ECMReadyEvent}
 import org.apache.linkis.ecm.server.service.ECMRegisterService
+import org.apache.linkis.ecm.server.util.ECMUtils
 import org.apache.linkis.manager.common.entity.resource._
 import org.apache.linkis.manager.common.protocol.em.{
   RegisterEMRequest,
@@ -62,7 +63,7 @@ class DefaultECMRegisterService extends ECMRegisterService with ECMEventListener
 
   private def getEMRegiterResourceFromConfiguration: NodeResource = {
     val maxResource = new LoadInstanceResource(
-      ECM_MAX_MEMORY_AVAILABLE,
+      ECMUtils.inferDefaultMemory(),
       ECM_MAX_CORES_AVAILABLE,
       ECM_MAX_CREATE_INSTANCES
     )
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/ECMUtils.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/ECMUtils.scala
index 2cdd1fc7e..08addb94c 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/ECMUtils.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/ECMUtils.scala
@@ -19,8 +19,19 @@ package org.apache.linkis.ecm.server.util
 
 import org.apache.linkis.bml.client.{BmlClient, BmlClientFactory}
 import org.apache.linkis.bml.protocol.BmlDownloadResponse
+import org.apache.linkis.common.utils.{HardwareUtils, Logging}
 import org.apache.linkis.ecm.errorcode.EngineconnServerErrorCodeSummary.FAILED_TO_DOWNLOAD
+import org.apache.linkis.ecm.server.conf.ECMConfiguration.{
+  ECM_MAX_CORES_AVAILABLE,
+  ECM_MAX_CREATE_INSTANCES,
+  ECM_MAX_MEMORY_AVAILABLE,
+  ECM_PROTECTED_CORES,
+  ECM_PROTECTED_INSTANCES,
+  ECM_PROTECTED_MEMORY,
+  ECM_STIMATE_ACTUAL_MEMORY_ENABLE
+}
 import org.apache.linkis.ecm.server.exception.ECMErrorException
+import org.apache.linkis.manager.common.entity.resource.LoadInstanceResource
 import org.apache.linkis.manager.common.protocol.bml.BmlResource
 import org.apache.linkis.rpc.Sender
 import org.apache.linkis.storage.fs.FileSystem
@@ -32,7 +43,7 @@ import java.util
 
 import scala.collection.JavaConverters._
 
-object ECMUtils {
+object ECMUtils extends Logging {
 
   @volatile var bmlClient: BmlClient = _
   val lock = new Object()
@@ -76,6 +87,34 @@ object ECMUtils {
   private val address =
     Sender.getThisInstance.substring(0, Sender.getThisInstance.lastIndexOf(":"))
 
+  val initMaxResource: LoadInstanceResource = {
+    new LoadInstanceResource(inferDefaultMemory, ECM_MAX_CORES_AVAILABLE, ECM_MAX_CREATE_INSTANCES)
+  }
+
+  val initMinResource: LoadInstanceResource = {
+    new LoadInstanceResource(ECM_PROTECTED_MEMORY, ECM_PROTECTED_CORES, ECM_PROTECTED_INSTANCES)
+  }
+
   def getInstanceByPort(port: String): String = address + ":" + port
 
+  // ecm machine memory
+  def inferDefaultMemory(): Long = {
+    // if enable estimate actual memory
+    if (ECM_STIMATE_ACTUAL_MEMORY_ENABLE) {
+
+      // 90%
+      val totalByte = (HardwareUtils.getMaxMemory() * 0.9).asInstanceOf[Long]
+
+      val resultMemory = math.max(totalByte, ECM_PROTECTED_MEMORY)
+      // max of PhysicalMemory or ECM_PROTECTED_MEMORY
+      logger.info(
+        s"Ecm protected memory:${ECM_PROTECTED_MEMORY} byte, ecm machine physical max memory:${totalByte} byte, will use the lager one:${resultMemory}"
+      )
+      resultMemory
+
+    } else {
+      ECM_MAX_MEMORY_AVAILABLE
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org