You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2019/11/01 19:16:22 UTC
[hbase] branch branch-1.4 updated: HBASE-23185 Fix high cpu usage
because getTable()#put() gets config value every time (#743)
This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.4 by this push:
new abeb598 HBASE-23185 Fix high cpu usage because getTable()#put() gets config value every time (#743)
abeb598 is described below
commit abeb5980a3b119434f1824cdc304eb2a917beceb
Author: bitterfox <bi...@gmail.com>
AuthorDate: Sat Nov 2 04:16:10 2019 +0900
HBASE-23185 Fix high cpu usage because getTable()#put() gets config value every time (#743)
* HBASE-23185 Fix high cpu usage because getTable()#put() gets config value every time
* Revert fix import order
---
.../apache/hadoop/hbase/client/AsyncProcess.java | 90 ++++++++++++++--------
.../hadoop/hbase/client/BufferedMutatorImpl.java | 21 +++--
.../hbase/client/ConnectionConfiguration.java | 48 +++++++++++-
.../hadoop/hbase/client/ConnectionManager.java | 13 +---
.../org/apache/hadoop/hbase/client/HTable.java | 8 +-
5 files changed, 121 insertions(+), 59 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 4c571e4..0c22473 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -323,35 +323,60 @@ class AsyncProcess {
this.id = COUNTER.incrementAndGet();
- this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
- HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
- long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
- if (configuredPauseForCQTBE < pause) {
- LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
- + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
- + ", will use " + pause + " instead.");
- this.pauseForCQTBE = pause;
- } else {
- this.pauseForCQTBE = configuredPauseForCQTBE;
- }
- this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ ConnectionConfiguration connConf =
+ hc.getConfiguration() == conf
+ ? hc.getConnectionConfiguration()
+ // Slow: parse conf in ConnectionConfiguration constructor
+ : new ConnectionConfiguration(conf);
+ if (connConf == null) {
+ // Slow: parse conf in ConnectionConfiguration constructor
+ connConf = new ConnectionConfiguration(conf);
+ }
+
+ this.pause = connConf.getPause();
+ this.pauseForCQTBE = connConf.getPauseForCQTBE();
+
+ this.numTries = connConf.getRetriesNumber();
this.rpcTimeout = rpcTimeout;
- this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
-
- this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
- HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
- this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
- HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
- this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
- HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
- this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
- DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
- this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
+ this.operationTimeout = connConf.getOperationTimeout();
+
+ // Parse config once and reuse config values of hc's AsyncProcess in AsyncProcess for put
+ // Can be null when constructing hc's AsyncProcess or it's not reusable
+ AsyncProcess globalAsyncProcess = hc.getConfiguration() == conf ? hc.getAsyncProcess() : null;
+
+ this.primaryCallTimeoutMicroseconds =
+ globalAsyncProcess == null
+ ? conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000)
+ : globalAsyncProcess.primaryCallTimeoutMicroseconds;
+
+ this.maxTotalConcurrentTasks =
+ globalAsyncProcess == null
+ ? conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+ HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)
+ : globalAsyncProcess.maxTotalConcurrentTasks;
+ this.maxConcurrentTasksPerServer =
+ globalAsyncProcess == null
+ ? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
+ HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS)
+ : globalAsyncProcess.maxConcurrentTasksPerServer;
+ this.maxConcurrentTasksPerRegion =
+ globalAsyncProcess == null
+ ? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
+ HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS)
+ : globalAsyncProcess.maxConcurrentTasksPerRegion;
+ this.maxHeapSizePerRequest =
+ globalAsyncProcess == null
+ ? conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
+ DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE)
+ : globalAsyncProcess.maxHeapSizePerRequest;
+ this.maxHeapSizeSubmit =
+ globalAsyncProcess == null
+ ? conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE)
+ : globalAsyncProcess.maxHeapSizeSubmit;
this.startLogErrorsCnt =
- conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
+ globalAsyncProcess == null
+ ? conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT)
+ : globalAsyncProcess.startLogErrorsCnt;
if (this.maxTotalConcurrentTasks <= 0) {
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
@@ -387,11 +412,16 @@ class AsyncProcess {
this.rpcCallerFactory = rpcCaller;
this.rpcFactory = rpcFactory;
- this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
+ this.logBatchErrorDetails =
+ globalAsyncProcess == null
+ ? conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false)
+ : globalAsyncProcess.logBatchErrorDetails;
this.thresholdToLogUndoneTaskDetails =
- conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
- DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
+ globalAsyncProcess == null
+ ? conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
+ DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS)
+ : globalAsyncProcess.thresholdToLogUndoneTaskDetails;
}
public void setRpcTimeout(int rpcTimeout) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index d207a82..41a1be7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -20,7 +20,6 @@ import java.io.Closeable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -99,18 +98,18 @@ public class BufferedMutatorImpl implements BufferedMutator {
this.pool = params.getPool();
this.listener = params.getListener();
- ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
+ ConnectionConfiguration connConf = conn.getConnectionConfiguration();
+ if (connConf == null) {
+ // Slow: parse conf in ConnectionConfiguration constructor
+ connConf = new ConnectionConfiguration(conf);
+ }
this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
- params.getWriteBufferSize() : tableConf.getWriteBufferSize();
+ params.getWriteBufferSize() : connConf.getWriteBufferSize();
this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
- params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
-
- this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
- conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- this.operationTimeout = conn.getConfiguration().getInt(
- HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+ params.getMaxKeyValueSize() : connConf.getMaxKeyValueSize();
+
+ this.writeRpcTimeout = connConf.getWriteRpcTimeout();
+ this.operationTimeout = connConf.getOperationTimeout();
// puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 65ddc78..2002cea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -11,6 +11,8 @@
package org.apache.hadoop.hbase.client;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -27,6 +29,7 @@ import com.google.common.annotations.VisibleForTesting;
*/
@InterfaceAudience.Private
public class ConnectionConfiguration {
+ static final Log LOG = LogFactory.getLog(ConnectionConfiguration.class);
public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
@@ -43,6 +46,10 @@ public class ConnectionConfiguration {
private final int metaReplicaCallTimeoutMicroSecondScan;
private final int retries;
private final int maxKeyValueSize;
+ private final int readRpcTimeout;
+ private final int writeRpcTimeout;
+ private final long pause;
+ private final long pauseForCQTBE;
/**
* Constructor
@@ -75,9 +82,28 @@ public class ConnectionConfiguration {
HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT);
this.retries = conf.getInt(
- HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
+
+ this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+ conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+
+ this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+ conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+
+ this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
+ if (configuredPauseForCQTBE < pause) {
+ LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ + ", will use " + pause + " instead.");
+ this.pauseForCQTBE = pause;
+ } else {
+ this.pauseForCQTBE = configuredPauseForCQTBE;
+ }
}
/**
@@ -98,6 +124,10 @@ public class ConnectionConfiguration {
HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
+ this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+ this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+ this.pause = HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
+ this.pauseForCQTBE = HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
}
public long getWriteBufferSize() {
@@ -139,4 +169,20 @@ public class ConnectionConfiguration {
public long getScannerMaxResultSize() {
return scannerMaxResultSize;
}
+
+ public int getReadRpcTimeout() {
+ return readRpcTimeout;
+ }
+
+ public int getWriteRpcTimeout() {
+ return writeRpcTimeout;
+ }
+
+ public long getPause() {
+ return pause;
+ }
+
+ public long getPauseForCQTBE() {
+ return pauseForCQTBE;
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 35ffa3e..11bad47 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -667,17 +667,8 @@ class ConnectionManager {
this.managed = managed;
this.connectionConfig = new ConnectionConfiguration(conf);
this.closed = false;
- this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
- HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
- long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
- if (configuredPauseForCQTBE < pause) {
- LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
- + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
- + ", will use " + pause + " instead.");
- this.pauseForCQTBE = pause;
- } else {
- this.pauseForCQTBE = configuredPauseForCQTBE;
- }
+ this.pause = connectionConfig.getPause();
+ this.pauseForCQTBE = connectionConfig.getPauseForCQTBE();
this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
HConstants.DEFAULT_USE_META_REPLICAS);
this.metaReplicaCallTimeoutScanInMicroSecond =
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 91a8f92..cbb7b01 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -366,12 +366,8 @@ public class HTable implements HTableInterface, RegionLocator {
}
this.operationTimeout = tableName.isSystemTable() ?
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
- this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
- configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
- configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+ this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
+ this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
if (this.rpcCallerFactory == null) {