You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2018/03/30 20:29:23 UTC
[accumulo] branch master updated: ACCUMULO-3807 Avoid Copy/Sort
column on WAL recovery exceeds 100% (#381)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 55c625a ACCUMULO-3807 Avoid Copy/Sort column on WAL recovery exceeds 100% (#381)
55c625a is described below
commit 55c625a7cc14ea4971b633b0ecb50ea60bc447b2
Author: ghajos <ro...@gmail.com>
AuthorDate: Fri Mar 30 22:29:21 2018 +0200
ACCUMULO-3807 Avoid Copy/Sort column on WAL recovery exceeds 100% (#381)
---
.../main/java/org/apache/accumulo/tserver/log/DfsLogger.java | 11 ++++++++---
.../main/java/org/apache/accumulo/tserver/log/LogSorter.java | 6 +++++-
2 files changed, 13 insertions(+), 4 deletions(-)
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 1bfeb73..56a2c5d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -448,9 +448,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
if (replication == 0)
replication = fs.getDefaultReplication(new Path(logPath));
- long blockSize = conf.getConfiguration().getAsBytes(Property.TSERV_WAL_BLOCKSIZE);
- if (blockSize == 0)
- blockSize = (long) (conf.getConfiguration().getAsBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
+ long blockSize = getWalBlockSize(conf.getConfiguration());
if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize);
else
@@ -513,6 +511,13 @@ public class DfsLogger implements Comparable<DfsLogger> {
log.debug("Got new write-ahead log: {}", this);
}
+ static long getWalBlockSize(AccumuloConfiguration conf) {
+ long blockSize = conf.getAsBytes(Property.TSERV_WAL_BLOCKSIZE);
+ if (blockSize == 0)
+ blockSize = (long) (conf.getAsBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
+ return blockSize;
+ }
+
@Override
public String toString() {
String fileName = getFileName();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 376a71e..5477087 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -220,6 +220,7 @@ public class LogSorter {
ThreadPoolExecutor threadPool;
private final Instance instance;
+ private double walBlockSize;
public LogSorter(Instance instance, VolumeManager fs, AccumuloConfiguration conf) {
this.instance = instance;
@@ -227,6 +228,7 @@ public class LogSorter {
this.conf = conf;
int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
+ this.walBlockSize = DfsLogger.getWalBlockSize(conf);
}
public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException {
@@ -241,7 +243,9 @@ public class LogSorter {
RecoveryStatus status = new RecoveryStatus();
status.name = entries.getKey();
try {
- status.progress = entries.getValue().getBytesCopied() / (0.0 + conf.getAsBytes(Property.TSERV_WALOG_MAX_SIZE));
+ double progress = entries.getValue().getBytesCopied() / walBlockSize;
+ // to be sure progress does not exceed 100%
+ status.progress = Math.min(progress, 99.9);
} catch (IOException ex) {
log.warn("Error getting bytes read");
}
--
To stop receiving notification emails like this one, please contact
ctubbsii@apache.org.