You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/05/13 16:02:13 UTC

svn commit: r1481854 - in /accumulo/branches/1.5: core/src/main/java/org/apache/accumulo/core/conf/ server/src/main/java/org/apache/accumulo/server/tabletserver/ server/src/main/java/org/apache/accumulo/server/tabletserver/log/

Author: ecn
Date: Mon May 13 14:02:12 2013
New Revision: 1481854

URL: http://svn.apache.org/r1481854
Log:
ACCUMULO-1328 add SYNC_BLOCK to WAL create calls; warn about synconclose flag when appropriate

Modified:
    accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java

Modified: accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1481854&r1=1481853&r2=1481854&view=diff
==============================================================================
--- accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java Mon May 13 14:02:12 2013
@@ -201,6 +201,8 @@ public enum Property {
   TSERV_ARCHIVE_WALOGS("tserver.archive.walogs", "false", PropertyType.BOOLEAN, "Keep copies of the WALOGs for debugging purposes"),
   TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT,
       "The number of threads for the distributed workq.  These threads are used for copying failed bulk files."),
+  TSERV_WAL_SYNC("tserver.wal.sync", "true", PropertyType.BOOLEAN, 
+      "Use the SYNC_BLOCK create flag to sync WAL writes to disk. Prevents problems recovering from sudden system resets."),
   
   // properties that are specific to logger server behavior
   LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the write-ahead logger servers"),

Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1481854&r1=1481853&r2=1481854&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Mon May 13 14:02:12 2013
@@ -3256,6 +3256,16 @@ public class TabletServer extends Abstra
         log.fatal(msg);
         System.exit(-1);
       }
+      try {
+        // if this class exists
+        Class.forName("org.apache.hadoop.fs.CreateFlag");
+        // we're running hadoop 2.0, 1.1
+        if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
+          log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss");
+        }
+      } catch (ClassNotFoundException ex) {
+        // hadoop 1.0
+      }
     }
     
   }

Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1481854&r1=1481853&r2=1481854&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Mon May 13 14:02:12 2013
@@ -25,9 +25,11 @@ import static org.apache.accumulo.server
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,10 +48,15 @@ import org.apache.accumulo.server.logger
 import org.apache.accumulo.server.logger.LogFileValue;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.tabletserver.TabletMutations;
+import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+//import org.apache.hadoop.fs.CreateFlag;
+//import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
 /**
@@ -106,8 +113,8 @@ public class DfsLogger {
         synchronized (closeLock) {
           if (!closed) {
             try {
-              logFile.sync();
-            } catch (IOException ex) {
+              sync.invoke(logFile);
+            } catch (Exception ex) {
               log.warn("Exception syncing " + ex);
               for (DfsLogger.LogWork logWork : work) {
                 logWork.exception = ex;
@@ -202,6 +209,7 @@ public class DfsLogger {
   private final ServerResources conf;
   private FSDataOutputStream logFile;
   private DataOutputStream encryptingLogFile = null;
+  private Method sync;
   private Path logPath;
   private String logger;
   
@@ -257,7 +265,23 @@ public class DfsLogger {
       int checkSum = fs.getConf().getInt("io.bytes.per.checksum", 512);
       blockSize -= blockSize % checkSum;
       blockSize = Math.max(blockSize, checkSum);
-      logFile = fs.create(logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+      if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
+        logFile = create(fs, logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+      else
+        logFile = fs.create(logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+      
+      try {
+        // sync: send data to datanodes
+        sync = logFile.getClass().getMethod("sync");
+        try {
+          // hsych: send data to datanodes and sync the data to disk
+          sync = logFile.getClass().getMethod("hsync");
+        } catch (NoSuchMethodException ex) {
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      
       
       // Initialize the crypto operations.
       @SuppressWarnings("deprecation")
@@ -304,6 +328,43 @@ public class DfsLogger {
     t.start();
   }
   
+  private FSDataOutputStream create(FileSystem fs, Path logPath, boolean b, int buffersize, short replication, long blockSize) throws IOException {
+    try {
+      // This... 
+      //    EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
+      //    return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
+      // Becomes this:
+      Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
+      List<Enum<?>> flags = new ArrayList<Enum<?>>();
+      if (createFlags.isEnum()) {
+        for (Object constant : createFlags.getEnumConstants()) {
+          if (constant.toString().equals("SYNC_BLOCK")) {
+            flags.add((Enum<?>)constant);
+            log.debug("Found synch enum " + constant);
+          }
+          if (constant.toString().equals("CREATE")) {
+            flags.add((Enum<?>)constant);
+            log.debug("Found CREATE enum " + constant);
+          }
+        }
+      }
+      Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
+      log.debug("CreateFlag set: " + set);
+      if (fs instanceof TraceFileSystem) {
+        fs = ((TraceFileSystem)fs).getImplementation();
+      }
+      Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
+      log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
+      return (FSDataOutputStream)create.invoke(fs, logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
+    } catch (ClassNotFoundException ex) {
+      // Expected in hadoop 1.0
+      return fs.create(logPath, b, buffersize, replication, blockSize);
+    } catch (Exception ex) {
+      log.debug(ex, ex);
+      return fs.create(logPath, b, buffersize, replication, blockSize);
+    }
+  }
+
   /*
    * (non-Javadoc)
    *