You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/29 22:09:44 UTC

svn commit: r789430 - in /activemq/sandbox/activemq-flow: activemq-util/ activemq-util/src/main/java/org/apache/activemq/util/ activemq-util/src/main/java/org/apache/activemq/util/os/ kahadb/src/main/java/org/apache/kahadb/journal/ kahadb/src/main/java...

Author: chirino
Date: Mon Jun 29 20:09:44 2009
New Revision: 789430

URL: http://svn.apache.org/viewvc?rev=789430&view=rev
Log:
Using the fcntcl(F_FULLFSYNC) call on OS X to safely sync data on to the disk patters. (Only enabled when the JNA lib is in the classpath)


Added:
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/os/
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/os/CLibrary.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/os/JnaCLibrary.java
Modified:
    activemq/sandbox/activemq-flow/activemq-util/pom.xml
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DiskBenchmark.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java
    activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
    activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: activemq/sandbox/activemq-flow/activemq-util/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/pom.xml?rev=789430&r1=789429&r2=789430&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-util/pom.xml Mon Jun 29 20:09:44 2009
@@ -31,6 +31,16 @@
 
   <name>ActiveMQ :: Util</name>
 
+
+  <repositories>
+    <!-- For the JNA dependency -->
+    <repository>
+      <id>maven2-repository.dev.java.net</id>
+      <name>Java.net Maven  Repository </name>
+      <url>http://download.java.net/maven/2</url>
+    </repository>
+  </repositories>
+  
   <dependencies>
                 
     <dependency>
@@ -43,6 +53,12 @@
       <artifactId>commons-logging</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>net.java.dev.jna</groupId>
+      <artifactId>jna</artifactId>
+      <version>3.1.0</version>
+      <optional>true</optional>
+    </dependency>
                 
     <!-- Testing Dependencies -->    
     <dependency>

Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DiskBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DiskBenchmark.java?rev=789430&r1=789429&r2=789430&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DiskBenchmark.java (original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DiskBenchmark.java Mon Jun 29 20:09:44 2009
@@ -235,9 +235,9 @@
             }
             // Sync to disk so that the we actually write the data to disk.. otherwise 
             // OS buffering might not really do the write.
-            raf.getFD().sync();
+            IOHelper.sync(raf.getFD());
         }
-        raf.getFD().sync();
+        IOHelper.sync(raf.getFD());
         raf.close();
         now = System.currentTimeMillis();
         
@@ -256,7 +256,7 @@
             for( long i=0; i+data.length < size; i+=data.length) {
                 raf.seek(i);
                 raf.write(data);
-                raf.getFD().sync();
+                IOHelper.sync(raf.getFD());
                 ioCount++;
                 now = System.currentTimeMillis();
                 if( (now-start)>sampleInterval ) {

Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java?rev=789430&r1=789429&r2=789430&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java (original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java Mon Jun 29 20:09:44 2009
@@ -17,12 +17,15 @@
 package org.apache.activemq.util;
 
 import java.io.File;
+import java.io.FileDescriptor;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.reflect.Field;
+
+import org.apache.activemq.util.os.CLibrary;
 
 /**
  * @version $Revision: 661435 $
@@ -184,4 +187,63 @@
             }
         }
     }
+    
+	public interface IOStrategy {
+		void sync(FileDescriptor fdo) throws IOException;
+	}	
+	
+	static final IOStrategy IO_STRATEGY = createIOStrategy();
+	
+	private static IOStrategy createIOStrategy() {
+		
+		// On OS X, the fsync system call does not fully flush the hardware buffers.. 
+		// to do that you have to do an fcntl call, and the only way to do that is to
+		// do some JNI.  
+		String os = System.getProperty("os.name");
+		if( "Mac OS X".equals(os) ) {
+
+			// We will gracefully fall back to default JDK file sync behavior
+			// if the JNA library is not in the path, and we can't set the 
+			// FileDescriptor.fd field accessible.
+			try {
+				final Field field = FileDescriptor.class.getDeclaredField("fd");
+				field.setAccessible(true);
+				// Try to dynamically load the JNA impl of the CLibrary interface..
+				final CLibrary lib = getCLibrary();
+				return new IOStrategy() {
+					static final int F_FULLFSYNC = 51;	
+					public void sync(FileDescriptor fd) throws IOException {
+						try {
+							int id = field.getInt(fd);
+							lib.fcntl(id, F_FULLFSYNC);
+						} catch (Exception e) {
+							throw IOExceptionSupport.create(e);
+						}
+					}
+				};
+			} catch (Exception ignore) {
+				ignore.printStackTrace();
+				// Perhaps we should issue a warning here so folks know that 
+				// the disk syncs are not going to be of very good quality.
+			}
+		}
+		
+		return new IOStrategy() {
+			public void sync(FileDescriptor fd) throws IOException {
+				fd.sync();
+			}
+		};
+	}
+
+	@SuppressWarnings("unchecked")
+	public static CLibrary getCLibrary() throws ClassNotFoundException, IllegalAccessException, NoSuchFieldException {
+		Class clazz = IOHelper.class.getClassLoader().loadClass("org.apache.activemq.util.os.JnaCLibrary");
+		final CLibrary lib = (CLibrary) clazz.getField("INSTANCE").get(null);
+		return lib;
+	}
+	
+	static public void sync(FileDescriptor fd) throws IOException {
+		IO_STRATEGY.sync(fd);
+	}
+
 }

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/os/CLibrary.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/os/CLibrary.java?rev=789430&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/os/CLibrary.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/os/CLibrary.java Mon Jun 29 20:09:44 2009
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util.os;
+
+public interface CLibrary {
+    void fcntl(int fd, int cmd, Object... args);
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/os/JnaCLibrary.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/os/JnaCLibrary.java?rev=789430&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/os/JnaCLibrary.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/os/JnaCLibrary.java Mon Jun 29 20:09:44 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util.os;
+
+import com.sun.jna.Library;
+import com.sun.jna.Native;
+
+public interface JnaCLibrary extends Library, CLibrary {
+	JnaCLibrary INSTANCE = (JnaCLibrary) Native.loadLibrary("c", JnaCLibrary.class);
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java?rev=789430&r1=789429&r2=789430&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java (original)
+++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java Mon Jun 29 20:09:44 2009
@@ -20,6 +20,7 @@
 import java.io.RandomAccessFile;
 import java.util.Map;
 
+import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
 import org.apache.kahadb.journal.DataFileAppender.WriteKey;
@@ -151,7 +152,7 @@
         int size = Math.min(data.getLength(), location.getSize());
         file.write(data.getData(), data.getOffset(), size);
         if (sync) {
-            file.getFD().sync();
+            IOHelper.sync(file.getFD());
         }
 
     }

Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=789430&r1=789429&r2=789430&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Mon Jun 29 20:09:44 2009
@@ -24,6 +24,7 @@
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
+import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.util.list.LinkedNode;
 import org.apache.activemq.util.list.LinkedNodeList;
@@ -376,7 +377,7 @@
                 }
                 
                 if (forceToDisk) {
-                    file.getFD().sync();
+                    IOHelper.sync(file.getFD());
                 }
 
                 WriteCommand lastWrite = wb.writes.getTail();

Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=789430&r1=789429&r2=789430&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Mon Jun 29 20:09:44 2009
@@ -547,10 +547,10 @@
         // So we don't loose it.. write it 2 times...
         writeFile.seek(0);
         writeFile.write(d);
-        writeFile.getFD().sync();
+        IOHelper.sync(writeFile.getFD());
         writeFile.seek(PAGE_FILE_HEADER_SIZE/2);
         writeFile.write(d);
-        writeFile.getFD().sync();
+        IOHelper.sync(writeFile.getFD());
     }
 
     private void storeFreeList() throws IOException {
@@ -988,7 +988,7 @@
             
             if (enableDiskSyncs) {
                 // Sync to make sure recovery buffer writes land on disk..
-                recoveryFile.getFD().sync();
+                IOHelper.sync(recoveryFile.getFD());
             }
             
             recoveryPageCount = batch.size();
@@ -1002,7 +1002,7 @@
         
         // Sync again
         if( enableDiskSyncs ) {
-            writeFile.getFD().sync();
+            IOHelper.sync(writeFile.getFD());
         }
         
         synchronized( writes ) {
@@ -1094,7 +1094,7 @@
         }
         
         // And sync it to disk
-        writeFile.getFD().sync();
+        IOHelper.sync(writeFile.getFD());
         return nextTxId;
     }