You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2019/11/07 01:44:29 UTC
[accumulo] branch master updated: check that the WAL directory and
log files can sync (#1410)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 7c2ecde check that the WAL directory and log files can sync (#1410)
7c2ecde is described below
commit 7c2ecde0c1001cf7f03fa31841e4c3ac0a8036dc
Author: etseidl <et...@users.noreply.github.com>
AuthorDate: Wed Nov 6 17:44:18 2019 -0800
check that the WAL directory and log files can sync (#1410)
check that the WAL directory and log files can sync, both at
tserver startup and log file creation
---
.../apache/accumulo/server/fs/VolumeManager.java | 3 +++
.../accumulo/server/fs/VolumeManagerImpl.java | 23 ++++++++++++++++++++++
.../org/apache/accumulo/tserver/TabletServer.java | 22 +++++++++++++++++++++
.../org/apache/accumulo/tserver/log/DfsLogger.java | 12 ++++++++---
4 files changed, 57 insertions(+), 3 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
index 9b5a0f8..0260c9b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
@@ -166,6 +166,9 @@ public interface VolumeManager {
// decide on which of the given locations to create a new file
String choose(VolumeChooserEnvironment env, String[] options);
+ // are sync and flush supported for the given path
+ boolean canSyncAndFlush(Path path);
+
/**
* Fetch the default Volume
*/
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index a9ce173..38f0b6a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -496,6 +497,28 @@ public class VolumeManagerImpl implements VolumeManager {
}
@Override
+ public boolean canSyncAndFlush(Path path) {
+ // the assumption is all filesystems support sync/flush except
+ // for HDFS erasure coding. not checking hdfs config options
+ // since that's already checked in ensureSyncIsEnabled()
+ FileSystem fs = getVolumeByPath(path).getFileSystem();
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ try {
+ ErasureCodingPolicy currEC = dfs.getErasureCodingPolicy(path);
+ if (currEC != null && !currEC.isReplicationPolicy()) {
+ return false;
+ }
+ } catch (IOException e) {
+ // don't spam warnings...if dir doesn't exist or not EC
+ // we don't really care if the above failed
+ log.debug("exception getting EC policy for " + path, e);
+ }
+ }
+ return true;
+ }
+
+ @Override
public Volume getDefaultVolume() {
return defaultVolume;
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index ec8c636..ee87c0c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -162,6 +162,7 @@ import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory.RateProv
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.fate.util.Retry;
import org.apache.accumulo.fate.util.Retry.RetryFactory;
+import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
@@ -171,6 +172,7 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.TabletLevel;
@@ -179,6 +181,8 @@ import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.data.ServerMutation;
import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeChooserEnvironment;
+import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.accumulo.server.log.SortedLogState;
@@ -372,6 +376,10 @@ public class TabletServer extends AbstractServer {
final long logBusyTabletsDelay =
aconf.getTimeInMillis(Property.TSERV_LOG_BUSY_TABLETS_INTERVAL);
+ // check early whether the WAL directory supports sync. issue warning if
+ // it doesn't
+ checkWalCanSync(context);
+
// This thread will calculate and log out the busiest tablets based on ingest count and
// query count every #{logBusiestTabletsDelay}
if (numBusyTabletsToLog > 0) {
@@ -3002,6 +3010,20 @@ public class TabletServer extends AbstractServer {
}
}
+ private void checkWalCanSync(ServerContext context) {
+ VolumeChooserEnvironment chooserEnv =
+ new VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.ChooserScope.LOGGER, context);
+ String logPath = fs.choose(chooserEnv, ServerConstants.getBaseUris(context)) + Path.SEPARATOR
+ + ServerConstants.WAL_DIR;
+ if (!fs.canSyncAndFlush(new Path(logPath))) {
+ // sleep a few seconds in case this is at cluster start...give monitor
+ // time to start so the warning will be more visible
+ UtilWaitThread.sleep(5000);
+ log.warn("WAL directory ({}) implementation does not support sync or flush."
+ + " Data loss may occur.", logPath);
+ }
+ }
+
private void config() {
log.info("Tablet server starting on {}", getHostname());
majorCompactorThread =
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 46a119d..d5d2031 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -431,17 +431,23 @@ public class DfsLogger implements Comparable<DfsLogger> {
metaReference = toString();
LoggerOperation op = null;
try {
+ Path logfilePath = new Path(logPath);
short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
if (replication == 0)
- replication = fs.getDefaultReplication(new Path(logPath));
+ replication = fs.getDefaultReplication(logfilePath);
long blockSize = getWalBlockSize(conf.getConfiguration());
if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
- logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize);
+ logFile = fs.createSyncable(logfilePath, 0, replication, blockSize);
else
- logFile = fs.create(new Path(logPath), true, 0, replication, blockSize);
+ logFile = fs.create(logfilePath, true, 0, replication, blockSize);
sync = logFile.getClass().getMethod("hsync");
flush = logFile.getClass().getMethod("hflush");
+ // check again that logfile can be sync'd
+ if (!fs.canSyncAndFlush(logfilePath)) {
+ log.warn("sync not supported for log file {}. Data loss may occur.", logPath);
+ }
+
// Initialize the log file with a header and its encryption
CryptoService cryptoService = context.getCryptoService();
logFile.write(LOG_FILE_HEADER_V4.getBytes(UTF_8));