You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/10/04 02:17:19 UTC
git commit: HBASE-12104 Some optimization and bugfix for
HTableMultiplexer (Yi Deng)
Repository: hbase
Updated Branches:
refs/heads/master 3acdf0682 -> bc4f25ff4
HBASE-12104 Some optimization and bugfix for HTableMultiplexer (Yi Deng)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bc4f25ff
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bc4f25ff
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bc4f25ff
Branch: refs/heads/master
Commit: bc4f25ff452da5cbb692088b51c6fd19e3f9fdc3
Parents: 3acdf06
Author: stack <st...@apache.org>
Authored: Fri Oct 3 17:17:09 2014 -0700
Committer: stack <st...@apache.org>
Committed: Fri Oct 3 17:17:09 2014 -0700
----------------------------------------------------------------------
.../hadoop/hbase/client/HTableMultiplexer.java | 407 ++++++++++---------
1 file changed, 208 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/bc4f25ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index e8c6909..fae0a94 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -28,23 +28,28 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
* Each put will be sharded into different buffer queues based on its destination region server.
@@ -63,24 +68,25 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceStability.Evolving
public class HTableMultiplexer {
private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
- private static int poolID = 0;
- static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms";
-
- /** The map between each region server to its corresponding buffer queue */
- private final Map<HRegionLocation, LinkedBlockingQueue<PutStatus>> serverToBufferQueueMap =
- new ConcurrentHashMap<>();
+ public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
+ "hbase.tablemultiplexer.flush.period.ms";
+ public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
+ public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
+ "hbase.client.max.retries.in.queue";
/** The map between each region server to its flush worker */
- private final Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap =
+ private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
new ConcurrentHashMap<>();
private final Configuration conf;
private final ClusterConnection conn;
private final ExecutorService pool;
private final int retryNum;
- private int perRegionServerBufferQueueSize;
+ private final int perRegionServerBufferQueueSize;
private final int maxKeyValueSize;
+ private final ScheduledExecutorService executor;
+ private final long flushPeriod;
/**
* @param conf The HBaseConfiguration
@@ -96,6 +102,11 @@ public class HTableMultiplexer {
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
+ this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
+ int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
+ this.executor =
+ Executors.newScheduledThreadPool(initThreads,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
}
/**
@@ -106,7 +117,7 @@ public class HTableMultiplexer {
* @return true if the request can be accepted by its corresponding buffer queue.
* @throws IOException
*/
- public boolean put(TableName tableName, final Put put) throws IOException {
+ public boolean put(TableName tableName, final Put put) {
return put(tableName, put, this.retryNum);
}
@@ -118,8 +129,7 @@ public class HTableMultiplexer {
* @return the list of puts which could not be queued
* @throws IOException
*/
- public List<Put> put(TableName tableName, final List<Put> puts)
- throws IOException {
+ public List<Put> put(TableName tableName, final List<Put> puts) {
if (puts == null)
return null;
@@ -140,23 +150,22 @@ public class HTableMultiplexer {
return failedPuts;
}
- public List<Put> put(byte[] tableName, final List<Put> puts) throws IOException {
+ /**
+ * Deprecated. Use {@link #put(TableName, List) } instead.
+ */
+ @Deprecated
+ public List<Put> put(byte[] tableName, final List<Put> puts) {
return put(TableName.valueOf(tableName), puts);
}
-
-
+
/**
* The put request will be buffered by its corresponding buffer queue. And the put request will be
* retried before dropping the request.
* Return false if the queue is already full.
- * @param tableName
- * @param put
- * @param retry
* @return true if the request can be accepted by its corresponding buffer queue.
* @throws IOException
*/
- public boolean put(final TableName tableName, final Put put, int retry)
- throws IOException {
+ public boolean put(final TableName tableName, final Put put, int retry) {
if (retry <= 0) {
return false;
}
@@ -166,25 +175,36 @@ public class HTableMultiplexer {
HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
if (loc != null) {
// Add the put pair into its corresponding queue.
-
LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
+
// Generate a MultiPutStatus object and offer it into the queue
PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
return queue.offer(s);
}
- } catch (Exception e) {
- LOG.debug("Cannot process the put " + put + " because of " + e);
+ } catch (IOException e) {
+ LOG.debug("Cannot process the put " + put, e);
}
return false;
}
- public boolean put(final byte[] tableName, final Put put, int retry)
- throws IOException {
+ /**
+ * Deprecated. Use {@link #put(TableName, Put) } instead.
+ */
+ @Deprecated
+ public boolean put(final byte[] tableName, final Put put, int retry) {
return put(TableName.valueOf(tableName), put, retry);
}
/**
+ * Deprecated. Use {@link #put(TableName, Put)} instead.
+ */
+ @Deprecated
+ public boolean put(final byte[] tableName, Put put) {
+ return put(TableName.valueOf(tableName), put);
+ }
+
+ /**
* @return the current HTableMultiplexerStatus
*/
public HTableMultiplexerStatus getHTableMultiplexerStatus() {
@@ -192,30 +212,20 @@ public class HTableMultiplexer {
}
private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
- LinkedBlockingQueue<PutStatus> queue = serverToBufferQueueMap.get(addr);
- if (queue == null) {
- synchronized (this.serverToBufferQueueMap) {
- queue = serverToBufferQueueMap.get(addr);
- if (queue == null) {
- // Create a queue for the new region server
- queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
- serverToBufferQueueMap.put(addr, queue);
-
+ FlushWorker worker = serverToFlushWorkerMap.get(addr);
+ if (worker == null) {
+ synchronized (this.serverToFlushWorkerMap) {
+ worker = serverToFlushWorkerMap.get(addr);
+ if (worker == null) {
// Create the flush worker
- HTableFlushWorker worker =
- new HTableFlushWorker(conf, this.conn, addr, this, queue, pool);
+ worker = new FlushWorker(conf, this.conn, addr, this, perRegionServerBufferQueueSize,
+ pool, executor);
this.serverToFlushWorkerMap.put(addr, worker);
-
- // Launch a daemon thread to flush the puts
- // from the queue to its corresponding region server.
- String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" + (poolID++);
- Thread t = new Thread(worker, name);
- t.setDaemon(true);
- t.start();
+ executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
}
}
}
- return queue;
+ return worker.getQueue();
}
/**
@@ -223,7 +233,7 @@ public class HTableMultiplexer {
* report the number of buffered requests and the number of the failed (dropped) requests
* in total or on per region server basis.
*/
- static class HTableMultiplexerStatus {
+ public static class HTableMultiplexerStatus {
private long totalFailedPutCounter;
private long totalBufferedPutCounter;
private long maxLatency;
@@ -234,7 +244,7 @@ public class HTableMultiplexer {
private Map<String, Long> serverToMaxLatencyMap;
public HTableMultiplexerStatus(
- Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
+ Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
this.totalBufferedPutCounter = 0;
this.totalFailedPutCounter = 0;
this.maxLatency = 0;
@@ -247,17 +257,17 @@ public class HTableMultiplexer {
}
private void initialize(
- Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
+ Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
if (serverToFlushWorkerMap == null) {
return;
}
long averageCalcSum = 0;
int averageCalcCount = 0;
- for (Map.Entry<HRegionLocation, HTableFlushWorker> entry : serverToFlushWorkerMap
+ for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
.entrySet()) {
HRegionLocation addr = entry.getKey();
- HTableFlushWorker worker = entry.getValue();
+ FlushWorker worker = entry.getValue();
long bufferedCounter = worker.getTotalBufferedCount();
long failedCounter = worker.getTotalFailedCount();
@@ -325,25 +335,15 @@ public class HTableMultiplexer {
}
private static class PutStatus {
- private final HRegionInfo regionInfo;
- private final Put put;
- private final int retryCount;
- public PutStatus(final HRegionInfo regionInfo, final Put put,
- final int retryCount) {
+ public final HRegionInfo regionInfo;
+ public final Put put;
+ public final int retryCount;
+
+ public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) {
this.regionInfo = regionInfo;
this.put = put;
this.retryCount = retryCount;
}
-
- public HRegionInfo getRegionInfo() {
- return regionInfo;
- }
- public Put getPut() {
- return put;
- }
- public int getRetryCount() {
- return retryCount;
- }
}
/**
@@ -386,26 +386,38 @@ public class HTableMultiplexer {
}
}
- private static class HTableFlushWorker implements Runnable {
+ private static class FlushWorker implements Runnable {
private final HRegionLocation addr;
- private final Configuration conf;
- private final ClusterConnection conn;
+ private final AsyncProcess asyncProc;
private final LinkedBlockingQueue<PutStatus> queue;
- private final HTableMultiplexer htableMultiplexer;
+ private final HTableMultiplexer multiplexer;
private final AtomicLong totalFailedPutCount = new AtomicLong(0);
- private final AtomicInteger currentProcessingPutCount = new AtomicInteger(0);
+ private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
private final AtomicLong maxLatency = new AtomicLong(0);
private final ExecutorService pool;
+ private final List<PutStatus> processingList = new ArrayList<>();
+ private final ScheduledExecutorService executor;
+ private final int maxRetryInQueue;
+ private final AtomicInteger retryInQueue = new AtomicInteger(0);
+ private final int rpcTimeOutMs;
- public HTableFlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
- HTableMultiplexer htableMultiplexer, LinkedBlockingQueue<PutStatus> queue, ExecutorService pool) {
+ public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
+ HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
+ ExecutorService pool, ScheduledExecutorService executor) {
this.addr = addr;
- this.conf = conf;
- this.conn = conn;
- this.htableMultiplexer = htableMultiplexer;
- this.queue = queue;
+ this.asyncProc = conn.getAsyncProcess();
+ this.multiplexer = htableMultiplexer;
+ this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
this.pool = pool;
+ this.executor = executor;
+ this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
+ this.rpcTimeOutMs =
+ conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ }
+
+ protected LinkedBlockingQueue<PutStatus> getQueue() {
+ return this.queue;
}
public long getTotalFailedCount() {
@@ -413,7 +425,7 @@ public class HTableMultiplexer {
}
public long getTotalBufferedCount() {
- return queue.size() + currentProcessingPutCount.get();
+ return queue.size() + currentProcessingCount.get();
}
public AtomicAverageCounter getAverageLatencyCounter() {
@@ -424,149 +436,146 @@ public class HTableMultiplexer {
return this.maxLatency.getAndSet(0);
}
- private boolean resubmitFailedPut(PutStatus failedPutStatus,
- HRegionLocation oldLoc) throws IOException {
- Put failedPut = failedPutStatus.getPut();
- // The currentPut is failed. So get the table name for the currentPut.
- TableName tableName = failedPutStatus.getRegionInfo().getTable();
+ private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
// Decrease the retry count
- int retryCount = failedPutStatus.getRetryCount() - 1;
+ final int retryCount = ps.retryCount - 1;
if (retryCount <= 0) {
// Update the failed counter and no retry any more.
return false;
- } else {
- // Retry one more time
- return this.htableMultiplexer.put(tableName, failedPut, retryCount);
}
+
+ int cnt = retryInQueue.incrementAndGet();
+ if (cnt > maxRetryInQueue) {
+ // Too many Puts in queue for resubmit, give up this
+ retryInQueue.decrementAndGet();
+ return false;
+ }
+
+ final Put failedPut = ps.put;
+ // The currentPut is failed. So get the table name for the currentPut.
+ final TableName tableName = ps.regionInfo.getTable();
+
+ // Wait at least RPC timeout time
+ long delayMs = rpcTimeOutMs;
+ delayMs = Math.max(delayMs, (long) (multiplexer.flushPeriod * Math.pow(2,
+ multiplexer.retryNum - retryCount)));
+
+ executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ boolean succ = false;
+ try {
+ succ = FlushWorker.this.multiplexer.put(tableName, failedPut, retryCount);
+ } finally {
+ FlushWorker.this.retryInQueue.decrementAndGet();
+ if (!succ) {
+ FlushWorker.this.totalFailedPutCount.incrementAndGet();
+ }
+ }
+ }
+ }, delayMs, TimeUnit.MILLISECONDS);
+ return true;
}
@Override
- @edu.umd.cs.findbugs.annotations.SuppressWarnings
- (value = "REC_CATCH_EXCEPTION", justification = "na")
public void run() {
- List<PutStatus> processingList = new ArrayList<>();
- /**
- * The frequency in milliseconds for the current thread to process the corresponding
- * buffer queue.
- **/
- long frequency = conf.getLong(TABLE_MULTIPLEXER_FLUSH_FREQ_MS, 100);
-
- // initial delay
+ int failedCount = 0;
try {
- Thread.sleep(frequency);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while sleeping");
- Thread.currentThread().interrupt();
- }
+ long start = EnvironmentEdgeManager.currentTime();
+
+ // drain all the queued puts into the tmp list
+ processingList.clear();
+ queue.drainTo(processingList);
+ if (processingList.size() == 0) {
+ // Nothing to flush
+ return;
+ }
- AsyncProcess ap = conn.getAsyncProcess();
+ currentProcessingCount.set(processingList.size());
+ // failedCount is decreased whenever a Put is success or resubmit.
+ failedCount = processingList.size();
+
+ List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
+ MultiAction<Row> actions = new MultiAction<>();
+ for (int i = 0; i < processingList.size(); i++) {
+ PutStatus putStatus = processingList.get(i);
+ Action<Row> action = new Action<Row>(putStatus.put, i);
+ actions.add(putStatus.regionInfo.getRegionName(), action);
+ retainedActions.add(action);
+ }
- long start, elapsed;
- int failedCount = 0;
- while (true) {
+ // Process this multi-put request
+ List<PutStatus> failed = null;
+ Object[] results = new Object[actions.size()];
+ ServerName server = addr.getServerName();
+ Map<ServerName, MultiAction<Row>> actionsByServer =
+ Collections.singletonMap(server, actions);
try {
- start = elapsed = EnvironmentEdgeManager.currentTime();
-
- // Clear the processingList, putToStatusMap and failedCount
- processingList.clear();
- failedCount = 0;
-
- // drain all the queued puts into the tmp list
- queue.drainTo(processingList);
- currentProcessingPutCount.set(processingList.size());
-
- if (processingList.size() > 0) {
- List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
- MultiAction<Row> actions = new MultiAction<>();
- for (int i = 0; i < processingList.size(); i++) {
- PutStatus putStatus = processingList.get(i);
- Action<Row> action = new Action<Row>(putStatus.getPut(), i);
- actions.add(putStatus.getRegionInfo().getRegionName(), action);
- retainedActions.add(action);
- }
-
- // Process this multi-put request
- List<PutStatus> failed = null;
- Object[] results = new Object[actions.size()];
- ServerName server = addr.getServerName();
- Map<ServerName, MultiAction<Row>> actionsByServer =
- Collections.singletonMap(server, actions);
- try {
- AsyncRequestFuture arf =
- ap.submitMultiActions(null, retainedActions, 0L, null, results,
- true, null, null, actionsByServer, pool);
- arf.waitUntilDone();
- if (arf.hasError()) {
- throw arf.getErrors();
- }
- } catch (IOException e) {
- LOG.debug("Caught some exceptions " + e
- + " when flushing puts to region server " + addr.getHostnamePort());
- } finally {
- // mutate list so that it is empty for complete success, or
- // contains only failed records
- // results are returned in the same order as the requests in list
- // walk the list backwards, so we can remove from list without
- // impacting the indexes of earlier members
- for (int i = 0; i < results.length; i++) {
- if (results[i] == null) {
- if (failed == null) {
- failed = new ArrayList<PutStatus>();
- }
- failed.add(processingList.get(i));
- }
- }
- }
-
- if (failed != null) {
- // Resubmit failed puts
- for (PutStatus putStatus : processingList) {
- if (!resubmitFailedPut(putStatus, this.addr)) {
- failedCount++;
- }
+ AsyncRequestFuture arf =
+ asyncProc.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
+ null, actionsByServer, pool);
+ arf.waitUntilDone();
+ if (arf.hasError()) {
+ // We just log and ignore the exception here since failed Puts will be resubmit again.
+ LOG.debug("Caught some exceptions when flushing puts to region server "
+ + addr.getHostnamePort(), arf.getErrors());
+ }
+ } finally {
+ for (int i = 0; i < results.length; i++) {
+ if (results[i] == null) {
+ if (failed == null) {
+ failed = new ArrayList<PutStatus>();
}
- // Update the totalFailedCount
- this.totalFailedPutCount.addAndGet(failedCount);
- }
-
- elapsed = EnvironmentEdgeManager.currentTime() - start;
- // Update latency counters
- averageLatency.add(elapsed);
- if (elapsed > maxLatency.get()) {
- maxLatency.set(elapsed);
+ failed.add(processingList.get(i));
+ } else {
+ failedCount--;
}
-
- // Log some basic info
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processed " + currentProcessingPutCount
- + " put requests for " + addr.getHostnamePort() + " and "
- + failedCount + " failed" + ", latency for this send: "
- + elapsed);
- }
-
- // Reset the current processing put count
- currentProcessingPutCount.set(0);
}
+ }
- // Sleep for a while
- if (elapsed == start) {
- elapsed = EnvironmentEdgeManager.currentTime() - start;
- }
- if (elapsed < frequency) {
- try {
- Thread.sleep(frequency - elapsed);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while sleeping");
- Thread.currentThread().interrupt();
+ if (failed != null) {
+ // Resubmit failed puts
+ for (PutStatus putStatus : processingList) {
+ if (resubmitFailedPut(putStatus, this.addr)) {
+ failedCount--;
}
}
- } catch (Exception e) {
- // Log all the exceptions and move on
- LOG.debug("Caught some exceptions " + e
- + " when flushing puts to region server "
- + addr.getHostnamePort(), e);
}
+
+ long elapsed = EnvironmentEdgeManager.currentTime() - start;
+ // Update latency counters
+ averageLatency.add(elapsed);
+ if (elapsed > maxLatency.get()) {
+ maxLatency.set(elapsed);
+ }
+
+ // Log some basic info
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processed " + currentProcessingCount + " put requests for "
+ + addr.getHostnamePort() + " and " + failedCount + " failed"
+ + ", latency for this send: " + elapsed);
+ }
+
+ // Reset the current processing put count
+ currentProcessingCount.set(0);
+ } catch (RuntimeException e) {
+ // To make findbugs happy
+ // Log all the exceptions and move on
+ LOG.debug(
+ "Caught some exceptions " + e + " when flushing puts to region server "
+ + addr.getHostnamePort(), e);
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ // Log all the exceptions and move on
+ LOG.debug(
+ "Caught some exceptions " + e + " when flushing puts to region server "
+ + addr.getHostnamePort(), e);
+ } finally {
+ // Update the totalFailedCount
+ this.totalFailedPutCount.addAndGet(failedCount);
}
}
}