You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/05/13 16:25:13 UTC
svn commit: r1481867 - in /accumulo/trunk: ./ assemble/ core/
core/src/main/java/org/apache/accumulo/core/conf/ examples/
fate/src/main/java/org/apache/accumulo/fate/
fate/src/main/java/org/apache/accumulo/fate/zookeeper/ proxy/ server/
server/src/main...
Author: ecn
Date: Mon May 13 14:25:13 2013
New Revision: 1481867
URL: http://svn.apache.org/r1481867
Log:
ACCUMULO-1328 add SYNC_BLOCK to WAL create calls; warn about synconclose flag when appropriate
Modified:
accumulo/trunk/ (props changed)
accumulo/trunk/assemble/ (props changed)
accumulo/trunk/core/ (props changed)
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
accumulo/trunk/examples/ (props changed)
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java (props changed)
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java (props changed)
accumulo/trunk/pom.xml (props changed)
accumulo/trunk/proxy/README (props changed)
accumulo/trunk/server/ (props changed)
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
accumulo/trunk/src/ (props changed)
Propchange: accumulo/trunk/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5:r1481854
Propchange: accumulo/trunk/assemble/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/assemble:r1481854
Propchange: accumulo/trunk/core/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/core:r1481126,1481854
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1481867&r1=1481866&r2=1481867&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Mon May 13 14:25:13 2013
@@ -201,6 +201,8 @@ public enum Property {
TSERV_ARCHIVE_WALOGS("tserver.archive.walogs", "false", PropertyType.BOOLEAN, "Keep copies of the WALOGs for debugging purposes"),
TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT,
"The number of threads for the distributed workq. These threads are used for copying failed bulk files."),
+ TSERV_WAL_SYNC("tserver.wal.sync", "true", PropertyType.BOOLEAN,
+ "Use the SYNC_BLOCK create flag to sync WAL writes to disk. Prevents problems recovering from sudden system resets."),
// properties that are specific to logger server behavior
LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the write-ahead logger servers"),
Propchange: accumulo/trunk/examples/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/examples:r1481126,1481854
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1481126,1481854
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1481126,1481854
Propchange: accumulo/trunk/pom.xml
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/pom.xml:r1481854
Propchange: accumulo/trunk/proxy/README
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/proxy/README:r1481126,1481854
Propchange: accumulo/trunk/server/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/server:r1481126,1481854
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1481867&r1=1481866&r2=1481867&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Mon May 13 14:25:13 2013
@@ -3253,6 +3253,16 @@ public class TabletServer extends Abstra
log.fatal(msg);
System.exit(-1);
}
+ try {
+ // if this class exists
+ Class.forName("org.apache.hadoop.fs.CreateFlag");
+ // we're running hadoop 2.0, 1.1
+ if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
+ log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss");
+ }
+ } catch (ClassNotFoundException ex) {
+ // hadoop 1.0
+ }
}
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1481867&r1=1481866&r2=1481867&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Mon May 13 14:25:13 2013
@@ -25,9 +25,11 @@ import static org.apache.accumulo.server
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -46,10 +48,15 @@ import org.apache.accumulo.server.logger
import org.apache.accumulo.server.logger.LogFileValue;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.tabletserver.TabletMutations;
+import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+//import org.apache.hadoop.fs.CreateFlag;
+//import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
/**
@@ -106,8 +113,8 @@ public class DfsLogger {
synchronized (closeLock) {
if (!closed) {
try {
- logFile.sync();
- } catch (IOException ex) {
+ sync.invoke(logFile);
+ } catch (Exception ex) {
log.warn("Exception syncing " + ex);
for (DfsLogger.LogWork logWork : work) {
logWork.exception = ex;
@@ -202,6 +209,7 @@ public class DfsLogger {
private final ServerResources conf;
private FSDataOutputStream logFile;
private DataOutputStream encryptingLogFile = null;
+ private Method sync;
private Path logPath;
private String logger;
@@ -257,7 +265,23 @@ public class DfsLogger {
int checkSum = fs.getConf().getInt("io.bytes.per.checksum", 512);
blockSize -= blockSize % checkSum;
blockSize = Math.max(blockSize, checkSum);
- logFile = fs.create(logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+ if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
+ logFile = create(fs, logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+ else
+ logFile = fs.create(logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+
+ try {
+ // sync: send data to datanodes
+ sync = logFile.getClass().getMethod("sync");
+ try {
+ // hsych: send data to datanodes and sync the data to disk
+ sync = logFile.getClass().getMethod("hsync");
+ } catch (NoSuchMethodException ex) {
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
// Initialize the crypto operations.
@SuppressWarnings("deprecation")
@@ -304,6 +328,43 @@ public class DfsLogger {
t.start();
}
+ private FSDataOutputStream create(FileSystem fs, Path logPath, boolean b, int buffersize, short replication, long blockSize) throws IOException {
+ try {
+ // This...
+ // EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
+ // return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
+ // Becomes this:
+ Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
+ List<Enum<?>> flags = new ArrayList<Enum<?>>();
+ if (createFlags.isEnum()) {
+ for (Object constant : createFlags.getEnumConstants()) {
+ if (constant.toString().equals("SYNC_BLOCK")) {
+ flags.add((Enum<?>)constant);
+ log.debug("Found synch enum " + constant);
+ }
+ if (constant.toString().equals("CREATE")) {
+ flags.add((Enum<?>)constant);
+ log.debug("Found CREATE enum " + constant);
+ }
+ }
+ }
+ Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
+ log.debug("CreateFlag set: " + set);
+ if (fs instanceof TraceFileSystem) {
+ fs = ((TraceFileSystem)fs).getImplementation();
+ }
+ Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
+ log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
+ return (FSDataOutputStream)create.invoke(fs, logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
+ } catch (ClassNotFoundException ex) {
+ // Expected in hadoop 1.0
+ return fs.create(logPath, b, buffersize, replication, blockSize);
+ } catch (Exception ex) {
+ log.debug(ex, ex);
+ return fs.create(logPath, b, buffersize, replication, blockSize);
+ }
+ }
+
/*
* (non-Javadoc)
*
Propchange: accumulo/trunk/src/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/src:r1481126,1481854