You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2018/03/30 20:29:23 UTC

[accumulo] branch master updated: ACCUMULO-3807 Avoid Copy/Sort column on WAL recovery exceeds 100% (#381)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 55c625a  ACCUMULO-3807 Avoid Copy/Sort column on WAL recovery exceeds 100% (#381)
55c625a is described below

commit 55c625a7cc14ea4971b633b0ecb50ea60bc447b2
Author: ghajos <ro...@gmail.com>
AuthorDate: Fri Mar 30 22:29:21 2018 +0200

    ACCUMULO-3807 Avoid Copy/Sort column on WAL recovery exceeds 100% (#381)
---
 .../main/java/org/apache/accumulo/tserver/log/DfsLogger.java  | 11 ++++++++---
 .../main/java/org/apache/accumulo/tserver/log/LogSorter.java  |  6 +++++-
 2 files changed, 13 insertions(+), 4 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 1bfeb73..56a2c5d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -448,9 +448,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
       short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
       if (replication == 0)
         replication = fs.getDefaultReplication(new Path(logPath));
-      long blockSize = conf.getConfiguration().getAsBytes(Property.TSERV_WAL_BLOCKSIZE);
-      if (blockSize == 0)
-        blockSize = (long) (conf.getConfiguration().getAsBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
+      long blockSize = getWalBlockSize(conf.getConfiguration());
       if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
         logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize);
       else
@@ -513,6 +511,13 @@ public class DfsLogger implements Comparable<DfsLogger> {
     log.debug("Got new write-ahead log: {}", this);
   }
 
+  static long getWalBlockSize(AccumuloConfiguration conf) {
+    long blockSize = conf.getAsBytes(Property.TSERV_WAL_BLOCKSIZE);
+    if (blockSize == 0)
+      blockSize = (long) (conf.getAsBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
+    return blockSize;
+  }
+
   @Override
   public String toString() {
     String fileName = getFileName();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 376a71e..5477087 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -220,6 +220,7 @@ public class LogSorter {
 
   ThreadPoolExecutor threadPool;
   private final Instance instance;
+  private double walBlockSize;
 
   public LogSorter(Instance instance, VolumeManager fs, AccumuloConfiguration conf) {
     this.instance = instance;
@@ -227,6 +228,7 @@ public class LogSorter {
     this.conf = conf;
     int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
     this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
+    this.walBlockSize = DfsLogger.getWalBlockSize(conf);
   }
 
   public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException {
@@ -241,7 +243,9 @@ public class LogSorter {
         RecoveryStatus status = new RecoveryStatus();
         status.name = entries.getKey();
         try {
-          status.progress = entries.getValue().getBytesCopied() / (0.0 + conf.getAsBytes(Property.TSERV_WALOG_MAX_SIZE));
+          double progress = entries.getValue().getBytesCopied() / walBlockSize;
+          // to be sure progress does not exceed 100%
+          status.progress = Math.min(progress, 99.9);
         } catch (IOException ex) {
           log.warn("Error getting bytes read");
         }

-- 
To stop receiving notification emails like this one, please contact
ctubbsii@apache.org.