You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/09/14 03:59:26 UTC
[05/50] [abbrv] hbase git commit: HBASE-18772 [JDK8] Replace
AtomicLong with LongAdder
HBASE-18772 [JDK8] Replace AtomicLong with LongAdder
Signed-off-by: Chia-Ping Tsai <ch...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/eb5e4367
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/eb5e4367
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/eb5e4367
Branch: refs/heads/HBASE-18467
Commit: eb5e43673c3aa93a0eb7af82b79c764f351bfcf7
Parents: a66bd04
Author: Yechao Chen <ch...@gmail.com>
Authored: Wed Sep 13 04:46:14 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Wed Sep 13 06:09:51 2017 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/SplitLogCounters.java | 114 +++++++++----------
.../SplitLogWorkerCoordination.java | 6 +-
.../ZKSplitLogManagerCoordination.java | 56 ++++-----
.../ZkSplitLogWorkerCoordination.java | 34 +++---
.../hadoop/hbase/io/hfile/LruBlockCache.java | 20 ++--
.../hbase/io/hfile/bucket/BucketAllocator.java | 6 +-
.../hbase/io/hfile/bucket/BucketCache.java | 37 +++---
.../hbase/ipc/AdaptiveLifoCoDelCallQueue.java | 16 +--
.../hadoop/hbase/master/SplitLogManager.java | 12 +-
.../apache/hadoop/hbase/mob/MobFileCache.java | 21 ++--
.../hadoop/hbase/regionserver/ChunkCreator.java | 8 +-
.../hadoop/hbase/regionserver/HRegion.java | 28 ++---
.../regionserver/MetricsRegionWrapperImpl.java | 12 +-
.../regionserver/RegionServerAccounting.java | 18 +--
.../handler/WALSplitterHandler.java | 2 +-
.../org/apache/hadoop/hbase/tool/Canary.java | 35 +++---
.../io/hfile/bucket/TestBucketWriterThread.java | 5 +-
.../master/TestDistributedLogSplitting.java | 19 ++--
.../hbase/master/TestSplitLogManager.java | 43 +++----
.../hbase/regionserver/TestSplitLogWorker.java | 16 +--
.../hadoop/hbase/thrift/IncrementCoalescer.java | 22 ++--
21 files changed, 268 insertions(+), 262 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
index bde1b88..8913514 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
@@ -18,7 +18,7 @@ package org.apache.hadoop.hbase;
* limitations under the License.
*/
import java.lang.reflect.Field;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -28,69 +28,69 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
*/
@InterfaceAudience.Private
public class SplitLogCounters {
- //SplitLogManager counters
- public final static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0);
- public final static AtomicLong tot_mgr_log_split_batch_success = new AtomicLong(0);
- public final static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0);
- public final static AtomicLong tot_mgr_new_unexpected_wals = new AtomicLong(0);
- public final static AtomicLong tot_mgr_log_split_start = new AtomicLong(0);
- public final static AtomicLong tot_mgr_log_split_success = new AtomicLong(0);
- public final static AtomicLong tot_mgr_log_split_err = new AtomicLong(0);
- public final static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0);
- public final static AtomicLong tot_mgr_node_create_result = new AtomicLong(0);
- public final static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0);
- public final static AtomicLong tot_mgr_node_create_err = new AtomicLong(0);
- public final static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0);
- public final static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0);
- public final static AtomicLong tot_mgr_get_data_result = new AtomicLong(0);
- public final static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0);
- public final static AtomicLong tot_mgr_get_data_err = new AtomicLong(0);
- public final static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0);
- public final static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0);
- public final static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0);
- public final static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0);
- public final static AtomicLong tot_mgr_resubmit = new AtomicLong(0);
- public final static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0);
- public final static AtomicLong tot_mgr_null_data = new AtomicLong(0);
- public final static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0);
- public final static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0);
- public final static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0);
- public final static AtomicLong tot_mgr_resubmit_threshold_reached = new AtomicLong(0);
- public final static AtomicLong tot_mgr_missing_state_in_delete = new AtomicLong(0);
- public final static AtomicLong tot_mgr_heartbeat = new AtomicLong(0);
- public final static AtomicLong tot_mgr_rescan = new AtomicLong(0);
- public final static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0);
- public final static AtomicLong tot_mgr_task_deleted = new AtomicLong(0);
- public final static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0);
- public final static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0);
- public final static AtomicLong tot_mgr_resubmit_dead_server_task = new AtomicLong(0);
- public final static AtomicLong tot_mgr_resubmit_force = new AtomicLong(0);
+ //Spnager counters
+ public final static LongAdder tot_mgr_log_split_batch_start = new LongAdder();
+ public final static LongAdder tot_mgr_log_split_batch_success = new LongAdder();
+ public final static LongAdder tot_mgr_log_split_batch_err = new LongAdder();
+ public final static LongAdder tot_mgr_new_unexpected_wals = new LongAdder();
+ public final static LongAdder tot_mgr_log_split_start = new LongAdder();
+ public final static LongAdder tot_mgr_log_split_success = new LongAdder();
+ public final static LongAdder tot_mgr_log_split_err = new LongAdder();
+ public final static LongAdder tot_mgr_node_create_queued = new LongAdder();
+ public final static LongAdder tot_mgr_node_create_result = new LongAdder();
+ public final static LongAdder tot_mgr_node_already_exists = new LongAdder();
+ public final static LongAdder tot_mgr_node_create_err = new LongAdder();
+ public final static LongAdder tot_mgr_node_create_retry = new LongAdder();
+ public final static LongAdder tot_mgr_get_data_queued = new LongAdder();
+ public final static LongAdder tot_mgr_get_data_result = new LongAdder();
+ public final static LongAdder tot_mgr_get_data_nonode = new LongAdder();
+ public final static LongAdder tot_mgr_get_data_err = new LongAdder();
+ public final static LongAdder tot_mgr_get_data_retry = new LongAdder();
+ public final static LongAdder tot_mgr_node_delete_queued = new LongAdder();
+ public final static LongAdder tot_mgr_node_delete_result = new LongAdder();
+ public final static LongAdder tot_mgr_node_delete_err = new LongAdder();
+ public final static LongAdder tot_mgr_resubmit = new LongAdder();
+ public final static LongAdder tot_mgr_resubmit_failed = new LongAdder();
+ public final static LongAdder tot_mgr_null_data = new LongAdder();
+ public final static LongAdder tot_mgr_orphan_task_acquired = new LongAdder();
+ public final static LongAdder tot_mgr_wait_for_zk_delete = new LongAdder();
+ public final static LongAdder tot_mgr_unacquired_orphan_done = new LongAdder();
+ public final static LongAdder tot_mgr_resubmit_threshold_reached = new LongAdder();
+ public final static LongAdder tot_mgr_missing_state_in_delete = new LongAdder();
+ public final static LongAdder tot_mgr_heartbeat = new LongAdder();
+ public final static LongAdder tot_mgr_rescan = new LongAdder();
+ public final static LongAdder tot_mgr_rescan_deleted = new LongAdder();
+ public final static LongAdder tot_mgr_task_deleted = new LongAdder();
+ public final static LongAdder tot_mgr_resubmit_unassigned = new LongAdder();
+ public final static LongAdder tot_mgr_relist_logdir = new LongAdder();
+ public final static LongAdder tot_mgr_resubmit_dead_server_task = new LongAdder();
+ public final static LongAdder tot_mgr_resubmit_force = new LongAdder();
// SplitLogWorker counters
- public final static AtomicLong tot_wkr_failed_to_grab_task_no_data = new AtomicLong(0);
- public final static AtomicLong tot_wkr_failed_to_grab_task_exception = new AtomicLong(0);
- public final static AtomicLong tot_wkr_failed_to_grab_task_owned = new AtomicLong(0);
- public final static AtomicLong tot_wkr_failed_to_grab_task_lost_race = new AtomicLong(0);
- public final static AtomicLong tot_wkr_task_acquired = new AtomicLong(0);
- public final static AtomicLong tot_wkr_task_resigned = new AtomicLong(0);
- public final static AtomicLong tot_wkr_task_done = new AtomicLong(0);
- public final static AtomicLong tot_wkr_task_err = new AtomicLong(0);
- public final static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0);
- public final static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0);
- public final static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0);
- public final static AtomicLong tot_wkr_get_data_result = new AtomicLong(0);
- public final static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0);
- public final static AtomicLong tot_wkr_preempt_task = new AtomicLong(0);
- public final static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0);
- public final static AtomicLong tot_wkr_final_transition_failed = new AtomicLong(0);
- public final static AtomicLong tot_wkr_task_grabing = new AtomicLong(0);
+ public final static LongAdder tot_wkr_failed_to_grab_task_no_data = new LongAdder();
+ public final static LongAdder tot_wkr_failed_to_grab_task_exception = new LongAdder();
+ public final static LongAdder tot_wkr_failed_to_grab_task_owned = new LongAdder();
+ public final static LongAdder tot_wkr_failed_to_grab_task_lost_race = new LongAdder();
+ public final static LongAdder tot_wkr_task_acquired = new LongAdder();
+ public final static LongAdder tot_wkr_task_resigned = new LongAdder();
+ public final static LongAdder tot_wkr_task_done = new LongAdder();
+ public final static LongAdder tot_wkr_task_err = new LongAdder();
+ public final static LongAdder tot_wkr_task_heartbeat = new LongAdder();
+ public final static LongAdder tot_wkr_task_acquired_rescan = new LongAdder();
+ public final static LongAdder tot_wkr_get_data_queued = new LongAdder();
+ public final static LongAdder tot_wkr_get_data_result = new LongAdder();
+ public final static LongAdder tot_wkr_get_data_retry = new LongAdder();
+ public final static LongAdder tot_wkr_preempt_task = new LongAdder();
+ public final static LongAdder tot_wkr_task_heartbeat_failed = new LongAdder();
+ public final static LongAdder tot_wkr_final_transition_failed = new LongAdder();
+ public final static LongAdder tot_wkr_task_grabing = new LongAdder();
public static void resetCounters() throws Exception {
Class<?> cl = SplitLogCounters.class;
for (Field fld : cl.getDeclaredFields()) {
/* Guard against source instrumentation. */
- if ((!fld.isSynthetic()) && (AtomicLong.class.isAssignableFrom(fld.getType()))) {
- ((AtomicLong)fld.get(null)).set(0);
+ if ((!fld.isSynthetic()) && (LongAdder.class.isAssignableFrom(fld.getType()))) {
+ ((LongAdder)fld.get(null)).reset();
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
index 5b26c49..ee83657 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
@@ -18,7 +18,7 @@
*/
package org.apache.hadoop.hbase.coordination;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
* {@link #isStop()} a flag indicates whether worker should finish <BR>
* {@link #registerListener()} called from {@link SplitLogWorker#run()} and could register listener
* for external changes in coordination (if required) <BR>
- * {@link #endTask(SplitLogTask, AtomicLong, SplitTaskDetails)} notify coordination engine that
+ * {@link #endTask(SplitLogTask, LongAdder, SplitTaskDetails)} notify coordination engine that
* <p>
* Important methods for WALSplitterHandler: <BR>
* splitting task has completed.
@@ -121,7 +121,7 @@ public interface SplitLogWorkerCoordination {
* @param splitTaskDetails details about log split task (specific to coordination engine being
* used).
*/
- void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails splitTaskDetails);
+ void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails splitTaskDetails);
/**
* Interface for log-split tasks Used to carry implementation details in encapsulated way through
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
index 1654c67..6017317 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
@@ -206,7 +206,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
if (task.unforcedResubmits.get() >= resubmitThreshold) {
if (!task.resubmitThresholdReached) {
task.resubmitThresholdReached = true;
- SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
+ SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment();
LOG.info("Skipping resubmissions of task " + path + " because threshold "
+ resubmitThreshold + " reached");
}
@@ -215,7 +215,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
// race with heartbeat() that might be changing last_version
version = task.last_version;
} else {
- SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
+ SplitLogCounters.tot_mgr_resubmit_force.increment();
version = -1;
}
LOG.info("resubmitting task " + path);
@@ -231,7 +231,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
}
task.setUnassigned();
rescan(Long.MAX_VALUE);
- SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
+ SplitLogCounters.tot_mgr_resubmit.increment();
return true;
}
@@ -273,7 +273,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
.getZooKeeper()
.getData(path, this.watcher, new GetDataAsyncCallback(),
Long.valueOf(-1) /* retry count */);
- SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
+ SplitLogCounters.tot_mgr_get_data_queued.increment();
}
/**
@@ -354,7 +354,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
}
private void deleteNode(String path, Long retries) {
- SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_delete_queued.increment();
// Once a task znode is ready for delete, that is it is in the TASK_DONE
// state, then no one should be writing to it anymore. That is no one
// will be updating the znode version any more.
@@ -370,9 +370,9 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
task = details.getTasks().remove(path);
if (task == null) {
if (ZKSplitLog.isRescanNode(watcher, path)) {
- SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
+ SplitLogCounters.tot_mgr_rescan_deleted.increment();
}
- SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
+ SplitLogCounters.tot_mgr_missing_state_in_delete.increment();
LOG.debug("deleted task without in memory state " + path);
return;
}
@@ -380,7 +380,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
task.status = DELETED;
task.notify();
}
- SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
+ SplitLogCounters.tot_mgr_task_deleted.increment();
}
private void deleteNodeFailure(String path) {
@@ -389,7 +389,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
}
private void createRescanSuccess(String path) {
- SplitLogCounters.tot_mgr_rescan.incrementAndGet();
+ SplitLogCounters.tot_mgr_rescan.increment();
getDataSetWatch(path, zkretries);
}
@@ -416,7 +416,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode());
ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
retry_count);
- SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_create_queued.increment();
return;
}
@@ -434,7 +434,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
private void getDataSetWatch(String path, Long retry_count) {
this.watcher.getRecoverableZooKeeper().getZooKeeper()
.getData(path, this.watcher, new GetDataAsyncCallback(), retry_count);
- SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
+ SplitLogCounters.tot_mgr_get_data_queued.increment();
}
@@ -446,7 +446,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
setDone(path, SUCCESS);
return;
}
- SplitLogCounters.tot_mgr_null_data.incrementAndGet();
+ SplitLogCounters.tot_mgr_null_data.increment();
LOG.fatal("logic error - got null data " + path);
setDone(path, FAILURE);
return;
@@ -497,17 +497,17 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
Task task = details.getTasks().get(path);
if (task == null) {
if (!ZKSplitLog.isRescanNode(watcher, path)) {
- SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
+ SplitLogCounters.tot_mgr_unacquired_orphan_done.increment();
LOG.debug("unacquired orphan task is done " + path);
}
} else {
synchronized (task) {
if (task.status == IN_PROGRESS) {
if (status == SUCCESS) {
- SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
+ SplitLogCounters.tot_mgr_log_split_success.increment();
LOG.info("Done splitting " + path);
} else {
- SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
+ SplitLogCounters.tot_mgr_log_split_err.increment();
LOG.warn("Error splitting " + path);
}
task.status = status;
@@ -536,7 +536,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
private Task findOrCreateOrphanTask(String path) {
return computeIfAbsent(details.getTasks(), path, Task::new, () -> {
LOG.info("creating orphan task " + path);
- SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
+ SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
});
}
@@ -547,7 +547,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
LOG.info("task " + path + " acquired by " + workerName);
}
task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName);
- SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
+ SplitLogCounters.tot_mgr_heartbeat.increment();
} else {
// duplicate heartbeats - heartbeats w/o zk node version
// changing - are possible. The timeout thread does
@@ -898,7 +898,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
LOG.debug("failed to resubmit task " + path + " version changed");
return false;
} catch (KeeperException e) {
- SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
+ SplitLogCounters.tot_mgr_resubmit_failed.increment();
LOG.warn("failed to resubmit " + path, e);
return false;
}
@@ -947,7 +947,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
@Override
public void processResult(int rc, String path, Object ctx, String name) {
- SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_create_result.increment();
if (rc != 0) {
if (needAbandonRetries(rc, "Create znode " + path)) {
createNodeFailure(path);
@@ -961,16 +961,16 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
// And all code pieces correctly handle the case of suddenly
// disappearing task-znode.
LOG.debug("found pre-existing znode " + path);
- SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_already_exists.increment();
} else {
Long retry_count = (Long) ctx;
LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + path
+ " remaining retries=" + retry_count);
if (retry_count == 0) {
- SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_create_err.increment();
createNodeFailure(path);
} else {
- SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_create_retry.increment();
createNode(path, retry_count - 1);
}
return;
@@ -988,13 +988,13 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
+ SplitLogCounters.tot_mgr_get_data_result.increment();
if (rc != 0) {
if (needAbandonRetries(rc, "GetData from znode " + path)) {
return;
}
if (rc == KeeperException.Code.NONODE.intValue()) {
- SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
+ SplitLogCounters.tot_mgr_get_data_nonode.increment();
LOG.warn("task znode " + path + " vanished or not created yet.");
// ignore since we should not end up in a case where there is in-memory task,
// but no znode. The only case is between the time task is created in-memory
@@ -1011,10 +1011,10 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path
+ " remaining retries=" + retry_count);
if (retry_count == 0) {
- SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
+ SplitLogCounters.tot_mgr_get_data_err.increment();
getDataSetWatchFailure(path);
} else {
- SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
+ SplitLogCounters.tot_mgr_get_data_retry.increment();
getDataSetWatch(path, retry_count - 1);
}
return;
@@ -1036,14 +1036,14 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
@Override
public void processResult(int rc, String path, Object ctx) {
- SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_delete_result.increment();
if (rc != 0) {
if (needAbandonRetries(rc, "Delete znode " + path)) {
details.getFailedDeletions().add(path);
return;
}
if (rc != KeeperException.Code.NONODE.intValue()) {
- SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
+ SplitLogCounters.tot_mgr_node_delete_err.increment();
Long retry_count = (Long) ctx;
LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path
+ " remaining retries=" + retry_count);
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
index 354f581..a2f1799 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.mutable.MutableInt;
@@ -156,7 +156,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) {
LOG.info("retrying data watch on " + path);
- SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
+ SplitLogCounters.tot_wkr_get_data_retry.increment();
getDataSetWatchAsync();
} else {
// no point setting a watch on the task which this worker is not
@@ -169,7 +169,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
public void getDataSetWatchAsync() {
watcher.getRecoverableZooKeeper().getZooKeeper()
.getData(currentTask, watcher, new GetDataAsyncCallback(), null);
- SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
+ SplitLogCounters.tot_wkr_get_data_queued.increment();
}
void getDataSetWatchSuccess(String path, byte[] data) {
@@ -221,12 +221,12 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
try {
try {
if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
- SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
+ SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment();
return;
}
} catch (KeeperException e) {
LOG.warn("Failed to get data for znode " + path, e);
- SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
+ SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
return;
}
SplitLogTask slt;
@@ -234,11 +234,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
slt = SplitLogTask.parseFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed parse data for znode " + path, e);
- SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
+ SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
return;
}
if (!slt.isUnassigned()) {
- SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
+ SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment();
return;
}
@@ -246,7 +246,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
attemptToOwnTask(true, watcher, server.getServerName(), path,
slt.getMode(), stat.getVersion());
if (currentVersion < 0) {
- SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
+ SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment();
return;
}
@@ -262,7 +262,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
}
LOG.info("worker " + server.getServerName() + " acquired task " + path);
- SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
+ SplitLogCounters.tot_wkr_task_acquired.increment();
getDataSetWatchAsync();
submitTask(path, slt.getMode(), currentVersion, reportPeriod);
@@ -371,11 +371,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
if (stat == null) {
LOG.warn("zk.setData() returned null for path " + task);
- SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
+ SplitLogCounters.tot_wkr_task_heartbeat_failed.increment();
return FAILED_TO_OWN_TASK;
}
latestZKVersion = stat.getVersion();
- SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
+ SplitLogCounters.tot_wkr_task_heartbeat.increment();
return latestZKVersion;
} catch (KeeperException e) {
if (!isFirstTime) {
@@ -392,7 +392,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
+ StringUtils.stringifyException(e1));
Thread.currentThread().interrupt();
}
- SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
+ SplitLogCounters.tot_wkr_task_heartbeat_failed.increment();
return FAILED_TO_OWN_TASK;
}
@@ -440,7 +440,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
return;
}
}
- SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
+ SplitLogCounters.tot_wkr_task_grabing.increment();
synchronized (taskReadyLock) {
while (seq_start == taskReadySeq.get()) {
taskReadyLock.wait(checkInterval);
@@ -567,7 +567,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
+ SplitLogCounters.tot_wkr_get_data_result.increment();
if (rc != 0) {
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
getDataSetWatchFailure(path);
@@ -588,14 +588,14 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
* @param ctr
*/
@Override
- public void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails details) {
+ public void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails details) {
ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details;
String task = zkDetails.getTaskNode();
int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue();
try {
if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) {
LOG.info("successfully transitioned task " + task + " to final state " + slt);
- ctr.incrementAndGet();
+ ctr.increment();
return;
}
LOG.warn("failed to transistion task " + task + " to end state " + slt
@@ -609,7 +609,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
} catch (KeeperException e) {
LOG.warn("failed to end task, " + task + " " + slt, e);
}
- SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
+ SplitLogCounters.tot_wkr_final_transition_failed.increment();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index 56996a4..c887d0a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.io.hfile;
import java.lang.ref.WeakReference;
-import java.nio.ByteBuffer;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
@@ -32,6 +31,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
@@ -178,13 +178,13 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
private final AtomicLong size;
/** Current size of data blocks */
- private final AtomicLong dataBlockSize;
+ private final LongAdder dataBlockSize;
/** Current number of cached elements */
private final AtomicLong elements;
/** Current number of cached data block elements */
- private final AtomicLong dataBlockElements;
+ private final LongAdder dataBlockElements;
/** Cache access count (sequential ID) */
private final AtomicLong count;
@@ -321,8 +321,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
this.stats = new CacheStats(this.getClass().getSimpleName());
this.count = new AtomicLong(0);
this.elements = new AtomicLong(0);
- this.dataBlockElements = new AtomicLong(0);
- this.dataBlockSize = new AtomicLong(0);
+ this.dataBlockElements = new LongAdder();
+ this.dataBlockSize = new LongAdder();
this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
this.size = new AtomicLong(this.overhead);
this.hardCapacityLimitFactor = hardLimitFactor;
@@ -409,7 +409,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
map.put(cacheKey, cb);
long val = elements.incrementAndGet();
if (buf.getBlockType().isData()) {
- dataBlockElements.incrementAndGet();
+ dataBlockElements.increment();
}
if (LOG.isTraceEnabled()) {
long size = map.size();
@@ -462,7 +462,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
heapsize *= -1;
}
if (bt != null && bt.isData()) {
- dataBlockSize.addAndGet(heapsize);
+ dataBlockSize.add(heapsize);
}
return size.addAndGet(heapsize);
}
@@ -569,7 +569,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
assertCounterSanity(size, val);
}
if (block.getBuffer().getBlockType().isData()) {
- dataBlockElements.decrementAndGet();
+ dataBlockElements.decrement();
}
if (evictedByEvictionProcess) {
// When the eviction of the block happened because of invalidation of HFiles, no need to
@@ -844,7 +844,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
@Override
public long getCurrentDataSize() {
- return this.dataBlockSize.get();
+ return this.dataBlockSize.sum();
}
@Override
@@ -864,7 +864,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
@Override
public long getDataBlockCount() {
- return this.dataBlockElements.get();
+ return this.dataBlockElements.sum();
}
EvictionThread getEvictionThread() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
index b4c5a44..715cd86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
@@ -27,7 +27,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.MinMaxPriorityQueue;
import org.apache.commons.collections4.map.LinkedMap;
@@ -347,7 +347,7 @@ public final class BucketAllocator {
* @throws BucketAllocatorException
*/
BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
- AtomicLong realCacheSize) throws BucketAllocatorException {
+ LongAdder realCacheSize) throws BucketAllocatorException {
this(availableSpace, bucketSizes);
// each bucket has an offset, sizeindex. probably the buckets are too big
@@ -398,7 +398,7 @@ public final class BucketAllocator {
bsi.instantiateBucket(b);
reconfigured[bucketNo] = true;
}
- realCacheSize.addAndGet(foundLen);
+ realCacheSize.add(foundLen);
buckets[bucketNo].addAllocation(foundOffset);
usedSize += buckets[bucketNo].getItemAllocationSize();
bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index aca8d7f..26a03fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -48,6 +48,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -165,13 +166,13 @@ public class BucketCache implements BlockCache, HeapSize {
private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>();
- private final AtomicLong realCacheSize = new AtomicLong(0);
- private final AtomicLong heapSize = new AtomicLong(0);
+ private final LongAdder realCacheSize = new LongAdder();
+ private final LongAdder heapSize = new LongAdder();
/** Current number of cached elements */
- private final AtomicLong blockNumber = new AtomicLong(0);
+ private final LongAdder blockNumber = new LongAdder();
/** Cache access count (sequential ID) */
- private final AtomicLong accessCount = new AtomicLong(0);
+ private final AtomicLong accessCount = new AtomicLong();
private static final int DEFAULT_CACHE_WAIT_TIME = 50;
// Used in test now. If the flag is false and the cache speed is very fast,
@@ -469,8 +470,8 @@ public class BucketCache implements BlockCache, HeapSize {
ramCache.remove(cacheKey);
cacheStats.failInsert();
} else {
- this.blockNumber.incrementAndGet();
- this.heapSize.addAndGet(cachedItem.heapSize());
+ this.blockNumber.increment();
+ this.heapSize.add(cachedItem.heapSize());
blocksByHFile.add(cacheKey);
}
}
@@ -545,10 +546,10 @@ public class BucketCache implements BlockCache, HeapSize {
@VisibleForTesting
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
bucketAllocator.freeBlock(bucketEntry.offset());
- realCacheSize.addAndGet(-1 * bucketEntry.getLength());
+ realCacheSize.add(-1 * bucketEntry.getLength());
blocksByHFile.remove(cacheKey);
if (decrementBlockNumber) {
- this.blockNumber.decrementAndGet();
+ this.blockNumber.decrement();
}
}
@@ -591,8 +592,8 @@ public class BucketCache implements BlockCache, HeapSize {
private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
if (removedBlock != null) {
- this.blockNumber.decrementAndGet();
- this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
+ this.blockNumber.decrement();
+ this.heapSize.add(-1 * removedBlock.getData().heapSize());
}
return removedBlock;
}
@@ -689,7 +690,7 @@ public class BucketCache implements BlockCache, HeapSize {
}
public long getRealCacheSize() {
- return this.realCacheSize.get();
+ return this.realCacheSize.sum();
}
private long acceptableSize() {
@@ -791,7 +792,7 @@ public class BucketCache implements BlockCache, HeapSize {
if (LOG.isDebugEnabled() && msgBuffer != null) {
LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
" of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
- StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize));
+ StringUtils.byteDesc(realCacheSize.sum()) + ", total=" + StringUtils.byteDesc(totalSize));
}
long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
@@ -1016,7 +1017,7 @@ public class BucketCache implements BlockCache, HeapSize {
// Always remove from ramCache even if we failed adding it to the block cache above.
RAMQueueEntry ramCacheEntry = ramCache.remove(key);
if (ramCacheEntry != null) {
- heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
+ heapSize.add(-1 * entries.get(i).getData().heapSize());
} else if (bucketEntries[i] != null){
// Block should have already been evicted. Remove it and free space.
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
@@ -1195,12 +1196,12 @@ public class BucketCache implements BlockCache, HeapSize {
@Override
public long heapSize() {
- return this.heapSize.get();
+ return this.heapSize.sum();
}
@Override
public long size() {
- return this.realCacheSize.get();
+ return this.realCacheSize.sum();
}
@Override
@@ -1215,7 +1216,7 @@ public class BucketCache implements BlockCache, HeapSize {
@Override
public long getBlockCount() {
- return this.blockNumber.get();
+ return this.blockNumber.sum();
}
@Override
@@ -1438,7 +1439,7 @@ public class BucketCache implements BlockCache, HeapSize {
public BucketEntry writeToCache(final IOEngine ioEngine,
final BucketAllocator bucketAllocator,
final UniqueIndexMap<Integer> deserialiserMap,
- final AtomicLong realCacheSize) throws CacheFullException, IOException,
+ final LongAdder realCacheSize) throws CacheFullException, IOException,
BucketAllocatorException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
@@ -1468,7 +1469,7 @@ public class BucketCache implements BlockCache, HeapSize {
throw ioe;
}
- realCacheSize.addAndGet(len);
+ realCacheSize.add(len);
return bucketEntry;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
index 82b8f1b..a21b48f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
@@ -24,7 +24,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -52,8 +52,8 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private int maxCapacity;
// metrics (shared across all queues)
- private AtomicLong numGeneralCallsDropped;
- private AtomicLong numLifoModeSwitches;
+ private LongAdder numGeneralCallsDropped;
+ private LongAdder numLifoModeSwitches;
// Both are in milliseconds
private volatile int codelTargetDelay;
@@ -76,7 +76,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private AtomicBoolean isOverloaded = new AtomicBoolean(false);
public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
- double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) {
+ double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches) {
this.maxCapacity = capacity;
this.queue = new LinkedBlockingDeque<>(capacity);
this.codelTargetDelay = targetDelay;
@@ -112,13 +112,13 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
CallRunner cr;
while(true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
- numLifoModeSwitches.incrementAndGet();
+ numLifoModeSwitches.increment();
cr = queue.takeLast();
} else {
cr = queue.takeFirst();
}
if (needToDrop(cr)) {
- numGeneralCallsDropped.incrementAndGet();
+ numGeneralCallsDropped.increment();
cr.drop();
} else {
return cr;
@@ -135,7 +135,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
// Only count once per switch.
if (!switched) {
switched = true;
- numLifoModeSwitches.incrementAndGet();
+ numLifoModeSwitches.increment();
}
cr = queue.pollLast();
} else {
@@ -146,7 +146,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
return cr;
}
if (needToDrop(cr)) {
- numGeneralCallsDropped.incrementAndGet();
+ numGeneralCallsDropped.increment();
cr.drop();
} else {
return cr;
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 7e35fe8..2eca304 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -248,7 +248,7 @@ public class SplitLogManager {
logDirs + " for serverName=" + serverNames);
FileStatus[] logfiles = getFileList(logDirs, filter);
status.setStatus("Checking directory contents...");
- SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
+ SplitLogCounters.tot_mgr_log_split_batch_start.increment();
LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs +
" for " + serverNames);
long t = EnvironmentEdgeManager.currentTime();
@@ -278,7 +278,7 @@ public class SplitLogManager {
if (batch.done != batch.installed) {
batch.isDead = true;
- SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
+ SplitLogCounters.tot_mgr_log_split_batch_err.increment();
LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
+ " but only " + batch.done + " done");
String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
@@ -302,7 +302,7 @@ public class SplitLogManager {
LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
}
}
- SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
+ SplitLogCounters.tot_mgr_log_split_batch_success.increment();
}
String msg =
"finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
@@ -474,7 +474,7 @@ public class SplitLogManager {
}
while (oldtask.status == FAILURE) {
LOG.debug("wait for status of task " + path + " to change to DELETED");
- SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
+ SplitLogCounters.tot_mgr_wait_for_zk_delete.increment();
try {
oldtask.wait();
} catch (InterruptedException e) {
@@ -694,7 +694,7 @@ public class SplitLogManager {
}
found_assigned_task = true;
if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
- SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
+ SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment();
if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
resubmitted++;
} else {
@@ -741,7 +741,7 @@ public class SplitLogManager {
}
}
getSplitLogManagerCoordination().checkTasks();
- SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
+ SplitLogCounters.tot_mgr_resubmit_unassigned.increment();
LOG.debug("resubmitting unassigned task(s) after timeout");
}
Set<String> failedDeletions =
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
index 308e216..4309dd5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
@@ -76,9 +77,9 @@ public class MobFileCache {
// caches access count
private final AtomicLong count = new AtomicLong(0);
private long lastAccess = 0;
- private final AtomicLong miss = new AtomicLong(0);
+ private final LongAdder miss = new LongAdder();
private long lastMiss = 0;
- private final AtomicLong evictedFileCount = new AtomicLong(0);
+ private final LongAdder evictedFileCount = new LongAdder();
private long lastEvictedFileCount = 0;
// a lock to sync the evict to guarantee the eviction occurs in sequence.
@@ -163,7 +164,7 @@ public class MobFileCache {
for (CachedMobFile evictedFile : evictedFiles) {
closeFile(evictedFile);
}
- evictedFileCount.addAndGet(evictedFiles.size());
+ evictedFileCount.add(evictedFiles.size());
}
}
@@ -180,7 +181,7 @@ public class MobFileCache {
CachedMobFile evictedFile = map.remove(fileName);
if (evictedFile != null) {
evictedFile.close();
- evictedFileCount.incrementAndGet();
+ evictedFileCount.increment();
}
} catch (IOException e) {
LOG.error("Failed to evict the file " + fileName, e);
@@ -219,7 +220,7 @@ public class MobFileCache {
cached = CachedMobFile.create(fs, path, conf, cacheConf);
cached.open();
map.put(fileName, cached);
- miss.incrementAndGet();
+ miss.increment();
}
}
cached.open();
@@ -294,7 +295,7 @@ public class MobFileCache {
* @return The count of misses to the mob file cache.
*/
public long getMissCount() {
- return miss.get();
+ return miss.sum();
}
/**
@@ -302,7 +303,7 @@ public class MobFileCache {
* @return The number of items evicted from the mob file cache.
*/
public long getEvictedFileCount() {
- return evictedFileCount.get();
+ return evictedFileCount.sum();
}
/**
@@ -310,7 +311,7 @@ public class MobFileCache {
* @return The hit ratio to the mob file cache.
*/
public double getHitRatio() {
- return count.get() == 0 ? 0 : ((float) (count.get() - miss.get())) / (float) count.get();
+ return count.get() == 0 ? 0 : ((float) (count.get() - miss.sum())) / (float) count.get();
}
/**
@@ -318,8 +319,8 @@ public class MobFileCache {
*/
public void printStatistics() {
long access = count.get() - lastAccess;
- long missed = miss.get() - lastMiss;
- long evicted = evictedFileCount.get() - lastEvictedFileCount;
+ long missed = miss.sum() - lastMiss;
+ long evicted = evictedFileCount.sum() - lastEvictedFileCount;
int hitRatio = access == 0 ? 0 : (int) (((float) (access - missed)) / (float) access * 100);
LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: "
+ (access - missed) + ", hit ratio: " + hitRatio + "%, evicted files: " + evicted);
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
index e818426..8d40796 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
@@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -29,6 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -220,7 +220,7 @@ public class ChunkCreator {
/** Statistics thread */
private static final int statThreadPeriod = 60 * 5;
private final AtomicLong chunkCount = new AtomicLong();
- private final AtomicLong reusedChunkCount = new AtomicLong();
+ private final LongAdder reusedChunkCount = new LongAdder();
MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) {
this.maxCount = maxCount;
@@ -254,7 +254,7 @@ public class ChunkCreator {
Chunk chunk = reclaimedChunks.poll();
if (chunk != null) {
chunk.reset();
- reusedChunkCount.incrementAndGet();
+ reusedChunkCount.increment();
} else {
// Make a chunk iff we have not yet created the maxCount chunks
while (true) {
@@ -303,7 +303,7 @@ public class ChunkCreator {
private void logStats() {
if (!LOG.isDebugEnabled()) return;
long created = chunkCount.get();
- long reused = reusedChunkCount.get();
+ long reused = reusedChunkCount.sum();
long total = created + reused;
LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+ ",created chunk count=" + created
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index bff8b7f..401c0d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -281,12 +281,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final LongAdder blockedRequestsCount = new LongAdder();
// Compaction LongAdders
- final AtomicLong compactionsFinished = new AtomicLong(0L);
- final AtomicLong compactionsFailed = new AtomicLong(0L);
- final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
- final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
- final AtomicLong compactionsQueued = new AtomicLong(0L);
- final AtomicLong flushesQueued = new AtomicLong(0L);
+ final LongAdder compactionsFinished = new LongAdder();
+ final LongAdder compactionsFailed = new LongAdder();
+ final LongAdder compactionNumFilesCompacted = new LongAdder();
+ final LongAdder compactionNumBytesCompacted = new LongAdder();
+ final LongAdder compactionsQueued = new LongAdder();
+ final LongAdder flushesQueued = new LongAdder();
private final WAL wal;
private final HRegionFileSystem fs;
@@ -2272,7 +2272,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
if(fs.isFlushSucceeded()) {
- flushesQueued.set(0L);
+ flushesQueued.reset();
}
status.markComplete("Flush successful");
@@ -8100,27 +8100,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
// metrics
- compactionsFinished.incrementAndGet();
- compactionNumFilesCompacted.addAndGet(numFiles);
- compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
+ compactionsFinished.increment();
+ compactionNumFilesCompacted.add(numFiles);
+ compactionNumBytesCompacted.add(filesSizeCompacted);
assert newValue >= 0;
}
public void reportCompactionRequestFailure() {
- compactionsFailed.incrementAndGet();
+ compactionsFailed.increment();
}
public void incrementCompactionsQueuedCount() {
- compactionsQueued.incrementAndGet();
+ compactionsQueued.increment();
}
public void decrementCompactionsQueuedCount() {
- compactionsQueued.decrementAndGet();
+ compactionsQueued.decrement();
}
public void incrementFlushesQueuedCount() {
- flushesQueued.incrementAndGet();
+ flushesQueued.increment();
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
index 75585f5..40e268d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
@@ -134,17 +134,17 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
@Override
public long getNumFilesCompacted() {
- return this.region.compactionNumFilesCompacted.get();
+ return this.region.compactionNumFilesCompacted.sum();
}
@Override
public long getNumBytesCompacted() {
- return this.region.compactionNumBytesCompacted.get();
+ return this.region.compactionNumBytesCompacted.sum();
}
@Override
public long getNumCompactionsCompleted() {
- return this.region.compactionsFinished.get();
+ return this.region.compactionsFinished.sum();
}
@Override
@@ -161,17 +161,17 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
@Override
public long getNumCompactionsFailed() {
- return this.region.compactionsFailed.get();
+ return this.region.compactionsFailed.sum();
}
@Override
public long getNumCompactionsQueued() {
- return this.region.compactionsQueued.get();
+ return this.region.compactionsQueued.sum();
}
@Override
public long getNumFlushesQueued() {
- return this.region.flushesQueued.get();
+ return this.region.flushesQueued.sum();
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
index a41a731..18a8e25 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.lang.management.MemoryType;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -38,11 +38,11 @@ import org.apache.hadoop.hbase.util.Pair;
public class RegionServerAccounting {
// memstore data size
- private final AtomicLong globalMemstoreDataSize = new AtomicLong(0);
+ private final LongAdder globalMemstoreDataSize = new LongAdder();
// memstore heap size. When off heap MSLAB in place, this will be only heap overhead of the Cell
// POJOs and entry overhead of them onto memstore. When on heap MSLAB, this will be include heap
// overhead as well as the cell data size. Ya cell data is in on heap area only then.
- private final AtomicLong globalMemstoreHeapSize = new AtomicLong(0);
+ private final LongAdder globalMemstoreHeapSize = new LongAdder();
// Store the edits size during replaying WAL. Use this to roll back the
// global memstore size once a region opening failed.
@@ -115,14 +115,14 @@ public class RegionServerAccounting {
* @return the global Memstore data size in the RegionServer
*/
public long getGlobalMemstoreDataSize() {
- return globalMemstoreDataSize.get();
+ return globalMemstoreDataSize.sum();
}
/**
* @return the global memstore heap size in the RegionServer
*/
public long getGlobalMemstoreHeapSize() {
- return this.globalMemstoreHeapSize.get();
+ return this.globalMemstoreHeapSize.sum();
}
/**
@@ -130,13 +130,13 @@ public class RegionServerAccounting {
* the global Memstore size
*/
public void incGlobalMemstoreSize(MemstoreSize memStoreSize) {
- globalMemstoreDataSize.addAndGet(memStoreSize.getDataSize());
- globalMemstoreHeapSize.addAndGet(memStoreSize.getHeapSize());
+ globalMemstoreDataSize.add(memStoreSize.getDataSize());
+ globalMemstoreHeapSize.add(memStoreSize.getHeapSize());
}
public void decGlobalMemstoreSize(MemstoreSize memStoreSize) {
- globalMemstoreDataSize.addAndGet(-memStoreSize.getDataSize());
- globalMemstoreHeapSize.addAndGet(-memStoreSize.getHeapSize());
+ globalMemstoreDataSize.add(-memStoreSize.getDataSize());
+ globalMemstoreHeapSize.add(-memStoreSize.getHeapSize());
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
index 8ad150b..b204fb6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
@@ -76,7 +76,7 @@ public class WALSplitterHandler extends EventHandler {
SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
break;
case PREEMPTED:
- SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
+ SplitLogCounters.tot_wkr_preempt_task.increment();
LOG.warn("task execution preempted " + splitTaskDetails.getWALFile());
break;
case ERR:
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
index 56517a4..875acd7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -250,24 +251,24 @@ public final class Canary implements Tool {
public static class RegionStdOutSink extends StdOutSink {
- private Map<String, AtomicLong> perTableReadLatency = new HashMap<>();
- private AtomicLong writeLatency = new AtomicLong();
+ private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
+ private LongAdder writeLatency = new LongAdder();
- public Map<String, AtomicLong> getReadLatencyMap() {
+ public Map<String, LongAdder> getReadLatencyMap() {
return this.perTableReadLatency;
}
- public AtomicLong initializeAndGetReadLatencyForTable(String tableName) {
- AtomicLong initLatency = new AtomicLong(0L);
+ public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
+ LongAdder initLatency = new LongAdder();
this.perTableReadLatency.put(tableName, initLatency);
return initLatency;
}
public void initializeWriteLatency() {
- this.writeLatency.set(0L);
+ this.writeLatency.reset();
}
- public AtomicLong getWriteLatency() {
+ public LongAdder getWriteLatency() {
return this.writeLatency;
}
}
@@ -323,10 +324,10 @@ public final class Canary implements Tool {
private TaskType taskType;
private boolean rawScanEnabled;
private ServerName serverName;
- private AtomicLong readWriteLatency;
+ private LongAdder readWriteLatency;
RegionTask(Connection connection, HRegionInfo region, ServerName serverName, RegionStdOutSink sink,
- TaskType taskType, boolean rawScanEnabled, AtomicLong rwLatency) {
+ TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) {
this.connection = connection;
this.region = region;
this.serverName = serverName;
@@ -414,7 +415,7 @@ public final class Canary implements Tool {
rs.next();
}
stopWatch.stop();
- this.readWriteLatency.addAndGet(stopWatch.getTime());
+ this.readWriteLatency.add(stopWatch.getTime());
sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
} catch (Exception e) {
sink.publishReadFailure(serverName, region, column, e);
@@ -466,7 +467,7 @@ public final class Canary implements Tool {
long startTime = System.currentTimeMillis();
table.put(put);
long time = System.currentTimeMillis() - startTime;
- this.readWriteLatency.addAndGet(time);
+ this.readWriteLatency.add(time);
sink.publishWriteTiming(serverName, region, column, time);
} catch (Exception e) {
sink.publishWriteFailure(serverName, region, column, e);
@@ -1049,7 +1050,7 @@ public final class Canary implements Tool {
}
this.initialized = true;
for (String table : tables) {
- AtomicLong readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
+ LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
taskFutures.addAll(Canary.sniff(admin, regionSink, table, executor, TaskType.READ,
this.rawScanEnabled, readLatency));
}
@@ -1068,7 +1069,7 @@ public final class Canary implements Tool {
}
// sniff canary table with write operation
regionSink.initializeWriteLatency();
- AtomicLong writeTableLatency = regionSink.getWriteLatency();
+ LongAdder writeTableLatency = regionSink.getWriteLatency();
taskFutures.addAll(Canary.sniff(admin, regionSink, admin.getTableDescriptor(writeTableName),
executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency));
}
@@ -1080,7 +1081,7 @@ public final class Canary implements Tool {
LOG.error("Sniff region failed!", e);
}
}
- Map<String, AtomicLong> actualReadTableLatency = regionSink.getReadLatencyMap();
+ Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap();
for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) {
String tableName = entry.getKey();
if (actualReadTableLatency.containsKey(tableName)) {
@@ -1167,7 +1168,7 @@ public final class Canary implements Tool {
for (HTableDescriptor table : admin.listTables()) {
if (admin.isTableEnabled(table.getTableName())
&& (!table.getTableName().equals(writeTableName))) {
- AtomicLong readLatency = regionSink.initializeAndGetReadLatencyForTable(table.getNameAsString());
+ LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table.getNameAsString());
taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType, this.rawScanEnabled, readLatency));
}
}
@@ -1235,7 +1236,7 @@ public final class Canary implements Tool {
* @throws Exception
*/
private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
- ExecutorService executor, TaskType taskType, boolean rawScanEnabled, AtomicLong readLatency) throws Exception {
+ ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s",
tableName));
@@ -1254,7 +1255,7 @@ public final class Canary implements Tool {
*/
private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
- boolean rawScanEnabled, AtomicLong rwLatency) throws Exception {
+ boolean rawScanEnabled, LongAdder rwLatency) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName()));
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
index 4f6ffd2..e789b4f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
@@ -139,7 +140,7 @@ public class TestBucketWriterThread {
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
- (UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any());
+ (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
// Cache disabled when ioes w/o ever healing.
@@ -162,7 +163,7 @@ public class TestBucketWriterThread {
Mockito.doThrow(cfe).
doReturn(mockedBucketEntry).
when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
- (UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any());
+ (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index 7df3086..b1d7e22 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -45,6 +45,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -1049,15 +1050,15 @@ public class TestDistributedLogSplitting {
long waitTime = 80000;
long endt = curt + waitTime;
while (curt < endt) {
- if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
- tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
- tot_wkr_preempt_task.get()) == 0) {
+ if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
+ tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
+ tot_wkr_preempt_task.sum()) == 0) {
Thread.yield();
curt = System.currentTimeMillis();
} else {
- assertTrue(1 <= (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
- tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
- tot_wkr_preempt_task.get()));
+ assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
+ tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
+ tot_wkr_preempt_task.sum()));
return;
}
}
@@ -1717,16 +1718,16 @@ public class TestDistributedLogSplitting {
}
}
- private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+ private void waitForCounter(LongAdder ctr, long oldval, long newval,
long timems) {
long curt = System.currentTimeMillis();
long endt = curt + timems;
while (curt < endt) {
- if (ctr.get() == oldval) {
+ if (ctr.sum() == oldval) {
Thread.yield();
curt = System.currentTimeMillis();
} else {
- assertEquals(newval, ctr.get());
+ assertEquals(newval, ctr.sum());
return;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
index 4c7bc54..6fd3e8d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -171,12 +172,12 @@ public class TestSplitLogManager {
long eval();
}
- private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems)
+ private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems)
throws Exception {
Expr e = new Expr() {
@Override
public long eval() {
- return ctr.get();
+ return ctr.sum();
}
};
waitForCounter(e, oldval, newval, timems);
@@ -199,7 +200,7 @@ public class TestSplitLogManager {
private Task findOrCreateOrphanTask(String path) {
return slm.tasks.computeIfAbsent(path, k -> {
LOG.info("creating orphan task " + k);
- SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
+ SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
return new Task();
});
}
@@ -214,7 +215,7 @@ public class TestSplitLogManager {
slm.enqueueSplitTask(name, batch);
assertEquals(1, batch.installed);
assertTrue(findOrCreateOrphanTask(tasknode).batch == batch);
- assertEquals(1L, tot_mgr_node_create_queued.get());
+ assertEquals(1L, tot_mgr_node_create_queued.sum());
LOG.debug("waiting for task node creation");
listener.waitForCreation();
@@ -286,7 +287,7 @@ public class TestSplitLogManager {
Task task2 = findOrCreateOrphanTask(tasknode);
assertTrue(task == task2);
LOG.debug("task = " + task);
- assertEquals(1L, tot_mgr_resubmit.get());
+ assertEquals(1L, tot_mgr_resubmit.sum());
assertEquals(1, task.incarnation.get());
assertEquals(0, task.unforcedResubmits.get());
assertTrue(task.isOrphan());
@@ -323,7 +324,7 @@ public class TestSplitLogManager {
waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
Thread.sleep(to + to/2);
- assertEquals(2L, tot_mgr_resubmit.get() - tot_mgr_resubmit_force.get());
+ assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum());
}
@Test (timeout=180000)
@@ -342,10 +343,10 @@ public class TestSplitLogManager {
waitForCounter(new Expr() {
@Override
public long eval() {
- return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
+ return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum());
}
}, 0, 1, 5*60000); // wait long enough
- Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get());
+ Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.sum());
int version1 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version1 > version);
byte[] taskstate = ZKUtil.getData(zkw, tasknode);
@@ -400,23 +401,23 @@ public class TestSplitLogManager {
@Test (timeout=180000)
public void testTaskResigned() throws Exception {
LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
- assertEquals(tot_mgr_resubmit.get(), 0);
+ assertEquals(tot_mgr_resubmit.sum(), 0);
slm = new SplitLogManager(master, conf);
- assertEquals(tot_mgr_resubmit.get(), 0);
+ assertEquals(tot_mgr_resubmit.sum(), 0);
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
- assertEquals(tot_mgr_resubmit.get(), 0);
+ assertEquals(tot_mgr_resubmit.sum(), 0);
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
- assertEquals(tot_mgr_resubmit.get(), 0);
+ assertEquals(tot_mgr_resubmit.sum(), 0);
SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
- assertEquals(tot_mgr_resubmit.get(), 0);
+ assertEquals(tot_mgr_resubmit.sum(), 0);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
ZKUtil.checkExists(zkw, tasknode);
// Could be small race here.
- if (tot_mgr_resubmit.get() == 0) {
+ if (tot_mgr_resubmit.sum() == 0) {
waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
}
- assertEquals(tot_mgr_resubmit.get(), 1);
+ assertEquals(tot_mgr_resubmit.sum(), 1);
byte[] taskstate = ZKUtil.getData(zkw, tasknode);
slt = SplitLogTask.parseFrom(taskstate);
@@ -472,10 +473,10 @@ public class TestSplitLogManager {
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
- if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
+ if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
slm.handleDeadWorker(worker1);
- if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
- if (tot_mgr_resubmit_dead_server_task.get() == 0) {
+ if (tot_mgr_resubmit.sum() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
+ if (tot_mgr_resubmit_dead_server_task.sum() == 0) {
waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
}
@@ -497,10 +498,10 @@ public class TestSplitLogManager {
SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
- if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
+ if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
// Not yet resubmitted.
- Assert.assertEquals(0, tot_mgr_resubmit.get());
+ Assert.assertEquals(0, tot_mgr_resubmit.sum());
// This server becomes dead
Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
@@ -508,7 +509,7 @@ public class TestSplitLogManager {
Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
// It has been resubmitted
- Assert.assertEquals(1, tot_mgr_resubmit.get());
+ Assert.assertEquals(1, tot_mgr_resubmit.sum());
}
@Test (timeout=180000)
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb5e4367/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index 18df013..1d2b038 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -27,7 +27,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -151,32 +151,32 @@ public class TestSplitLogWorker {
}
}
- private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
+ private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
throws Exception {
- assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval,
+ assertTrue("ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval,
waitForCounterBoolean(ctr, oldval, newval, timems));
}
- private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval,
+ private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval,
long timems) throws Exception {
return waitForCounterBoolean(ctr, oldval, newval, timems, true);
}
- private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, final long newval,
+ private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval,
long timems, boolean failIfTimeout) throws Exception {
long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
- return (ctr.get() >= newval);
+ return (ctr.sum() >= newval);
}
});
if( timeWaited > 0) {
// when not timed out
- assertEquals(newval, ctr.get());
+ assertEquals(newval, ctr.sum());
}
return true;
}
@@ -293,7 +293,7 @@ public class TestSplitLogWorker {
// not it, that we fell through to the next counter in line and it was set.
assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
WAIT_TIME, false) ||
- SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
+ SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1);
byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));