You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/11/19 18:26:42 UTC

svn commit: r882212 - in /activemq/sandbox/activemq-apollo/activemq-syscall/src: main/java/org/apache/activemq/syscall/ main/java/org/apache/activemq/syscall/jni/ test/java/org/apache/activemq/syscall/ test/java/org/apache/activemq/syscall/jni/

Author: chirino
Date: Thu Nov 19 17:26:41 2009
New Revision: 882212

URL: http://svn.apache.org/viewvc?rev=882212&view=rev
Log:
Adding a higher level FileDescriptor interface for working with AIO.


Added:
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/AioPollAgent.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/Callback.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FutureCallback.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/TestSupport.java
Modified:
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java

Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/AioPollAgent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/AioPollAgent.java?rev=882212&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/AioPollAgent.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/AioPollAgent.java Thu Nov 19 17:26:41 2009
@@ -0,0 +1,148 @@
+package org.apache.activemq.syscall;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.syscall.jni.AIO;
+
+import static java.util.concurrent.TimeUnit.*;
+
+import static org.apache.activemq.syscall.jni.AIO.*;
+import static org.apache.activemq.syscall.jni.CLibrary.*;
+import static org.apache.activemq.syscall.jni.AIO.timespec.*;
+
+public class AioPollAgent {
+    
+    private static final int ASYNC_THREAD_SHUTDOWN_DELAY = 1000;
+    public static final AioPollAgent MAIN_POLL_AGENT = new AioPollAgent();
+    public static AioPollAgent getMainPollAgent() {
+        return MAIN_POLL_AGENT;
+    }
+
+    long timeoutp;
+    LinkedHashMap<Long, Callback<Long>> callbackMap = new LinkedHashMap<Long, Callback<Long>>();
+    private Thread thread;
+    
+    public void setTimeOut(long value, TimeUnit unit) {
+        synchronized (this) {
+            if( timeoutp == NULL ) {
+                timeoutp = malloc(AIO.timespec.SIZEOF);
+                if( timeoutp == NULL ) {
+                    throw new OutOfMemoryError();
+                }
+            }
+            memmove(timeoutp, timespec(value, unit), AIO.timespec.SIZEOF);
+        }
+    }
+    
+    static public timespec timespec(long value, TimeUnit unit) {
+        return timespec(unit.toMillis(value));
+    }
+    
+    static public timespec timespec(long ms) {
+        timespec t = new timespec();
+        t.tv_sec = ms/1000;
+        ms = ms%1000;
+        t.tv_nsec = 1000000*ms; // 50 milli seconds
+        return t;
+    }
+    
+    public void watch(long aiocbp, Callback<Long> callback) {
+        assert aiocbp!=NULL;
+        synchronized (this) {
+            if( callbackMap.isEmpty() && thread==null) {
+                if( timeoutp==NULL ) {
+                    // Default the timeout..
+                    setTimeOut(10, MILLISECONDS);
+                }
+                thread = new Thread("AioPollAgent") {
+                    public void run() {
+                        process();
+                    }
+                };
+                thread.setDaemon(false);
+                thread.setPriority(Thread.MAX_PRIORITY);
+                thread.start();
+            }
+            callbackMap.put(aiocbp, callback);
+            notify();
+        }
+    }
+
+    private void process() {
+        long[] aiocbps;
+        while ((aiocbps = dequeueBlocks())!=null ) {
+            process(aiocbps);
+        }
+    }
+
+    private long[] dequeueBlocks() {
+        long blocks[];
+        synchronized (this) {
+            if( callbackMap.isEmpty() ) {
+                try {
+                    wait(ASYNC_THREAD_SHUTDOWN_DELAY);
+                    if( callbackMap.isEmpty() ) {
+                        thread=null;
+                        return null;
+                    }
+                } catch (InterruptedException e) {
+                    thread=null;
+                    return null;
+                } 
+            }
+            Set<Long> keys = callbackMap.keySet();
+            blocks = new long[keys.size()];
+            int i=0;
+            for (Long value : keys) {
+                blocks[i++]=value;
+            }
+        }
+        return blocks;
+    }
+
+    private void process(long[] aiocbps) {
+        int rc = aio_suspend(aiocbps, aiocbps.length, timeoutp);
+        if (rc == 0) {
+            for (int i = 0; i < aiocbps.length; i++) {
+                rc = aio_error(aiocbps[i]);
+                if( rc==EINPROGRESS )
+                    continue;
+                
+                // The io has now completed.. free up the memory allocated for 
+                // the aio control block, a remove it from the callback map
+                Callback<Long> callback = removeBlock(aiocbps[i]);
+                free(aiocbps[i]);
+                
+                // Let the callback know that the IO completed.
+                try {
+                    if( rc==0 ) {
+                        callback.onSuccess(aio_return(aiocbps[i]));
+                    } else {
+                        callback.onFailure(new IOException(string(strerror(errno()))));
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private Callback<Long> removeBlock(long aiocbp) {
+        synchronized (this) {
+            return callbackMap.remove(aiocbp);
+        }
+    }
+
+    protected void finalize() throws Throwable {
+        synchronized(this) {
+            if ( timeoutp!=NULL ) {
+                free(timeoutp);
+                timeoutp=NULL;
+            }
+        }
+    }
+    
+}

Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/Callback.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/Callback.java?rev=882212&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/Callback.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/Callback.java Thu Nov 19 17:26:41 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.syscall;
+
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ * @param <T>
+ */
+public interface Callback<T> {
+    public void onSuccess(T result);
+    public void onFailure(Throwable exception);
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java?rev=882212&r1=882211&r2=882212&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java Thu Nov 19 17:26:41 2009
@@ -16,12 +16,13 @@
  */
 package org.apache.activemq.syscall;
 
+import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
 
 import org.apache.activemq.syscall.jni.IO;
+import org.apache.activemq.syscall.jni.AIO.aiocb;
 
+import static org.apache.activemq.syscall.jni.AIO.*;
 import static org.apache.activemq.syscall.jni.CLibrary.*;
 
 /**
@@ -32,11 +33,16 @@
 
     private final int fd;
     boolean opened;
+    private AioPollAgent aioPollAgent;
 
     public FileDescriptor(int fd) {
         this.fd = fd;
     }
 
+    public static FileDescriptor open(File file, int oflags, int mode) throws IOException {
+        return open(file.getPath(), oflags, mode); 
+    }
+    
     public static FileDescriptor open(String path, int oflags, int mode) throws IOException {
         int fd = IO.open(path, oflags, mode);
         if( fd== -1 ) {
@@ -72,8 +78,55 @@
         return fd;
     }
 
-    public void write(NativeAllocation writeBuffer, Callable<Integer> callback) {
+    /**
+     * does an async write, the callback gets executed once the write completes.
+     * 
+     * @param buffer
+     * @param callback
+     */
+    public void write(long offset, NativeAllocation buffer, Callback<Long> callback) throws IOException {
+        
+        aiocb cb = new aiocb();
+        cb.aio_fildes = fd;
+        cb.aio_offset = offset;
+        cb.aio_buf = buffer.pointer();        
+        cb.aio_nbytes = buffer.length();
+
+        long aiocbp = malloc(aiocb.SIZEOF);
+        if( aiocbp==NULL ) {
+            throw new OutOfMemoryError();
+        }
+        aiocb.memmove(aiocbp, cb, aiocb.SIZEOF);
+        aio_write(aiocbp);
+
+        AioPollAgent agent = getAioPollAgent();
+        agent.watch(aiocbp, callback);
         return;
     }
     
+    private AioPollAgent getAioPollAgent() {
+        if( aioPollAgent==null ) {
+            aioPollAgent = AioPollAgent.getMainPollAgent();
+        }
+        return aioPollAgent;
+    }
+
+    public void setAioPollAgent(AioPollAgent aioPollAgent) {
+        this.aioPollAgent = aioPollAgent;
+    }
+
+    /**
+     * does an async read, the callback gets executed once the read completes.
+     * 
+     * @param buffer
+     * @param callback
+     */
+    public void read(long offset, NativeAllocation buffer, Callback<Long> callback) throws IOException {
+        
+
+        return;
+    }
+    
+    
+    
 }

Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FutureCallback.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FutureCallback.java?rev=882212&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FutureCallback.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FutureCallback.java Thu Nov 19 17:26:41 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.syscall;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A Callback implementation which provides a Future interface.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ *
+ * @param <T>
+ */
+public class FutureCallback<T> implements Callback<T>, Future<T> {
+        
+        private CountDownLatch done = new CountDownLatch(1);
+        volatile private Throwable exception;
+        volatile private T result;
+
+        public void onFailure(Throwable exception) {
+            this.exception = exception;
+            done.countDown();
+        }
+
+        public void onSuccess(T result) {
+            this.result = result;
+            done.countDown();
+        }
+
+        public T get() throws InterruptedException, ExecutionException {
+            done.await();
+            return internalGet();
+        }
+
+        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            if ( done.await(timeout, unit) ) {
+                return internalGet();
+            }
+            throw new TimeoutException();
+        }
+        
+        public boolean isDone() {
+            return done.getCount()==0;
+        }
+        
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return false;
+        }
+
+        /**
+         * This future cannot be canceled.  It always throws UnsupportedOperationException.
+         * 
+         * @throws UnsupportedOperationException 
+         */
+        public boolean isCancelled() {
+            throw new UnsupportedOperationException();
+        }
+        
+        private T internalGet() throws ExecutionException {
+            if( exception!=null ) {
+                throw new ExecutionException(exception);
+            }
+            return result;
+        }
+    }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java?rev=882212&r1=882211&r2=882212&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java Thu Nov 19 17:26:41 2009
@@ -105,9 +105,9 @@
         public static int SIZEOF;
         
         @JniField(cast="time_t")
-        long tv_sec;  
+        public long tv_sec;  
         @JniField(cast="long")
-        long tv_nsec;
+        public long tv_nsec;
         
         @JniMethod(conditional="#ifdef HAVE_AIO_H")
         public static final native void memmove (

Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java?rev=882212&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java Thu Nov 19 17:26:41 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.syscall;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.activemq.syscall.jni.AIO;
+import org.junit.Test;
+
+import static org.apache.activemq.syscall.NativeAllocation.*;
+import static org.apache.activemq.syscall.TestSupport.*;
+import static org.apache.activemq.syscall.jni.IO.*;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class FileDescriptorTest {
+    
+    @Test
+    public void writeWithACallback() throws IOException, InterruptedException, ExecutionException, TimeoutException {
+        assumeThat(AIO.SUPPORTED, is(true));
+        
+        String expected = generateString(1024*4);
+        NativeAllocation buffer = allocate(expected);
+
+        File file = dataFile(FileDescriptorTest.class.getName()+".writeWithACallback.data");
+        
+        int oflags = O_NONBLOCK | O_CREAT | O_TRUNC | O_RDWR;
+        int mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
+        FileDescriptor fd = FileDescriptor.open(file, oflags, mode);
+        
+        try {
+            FutureCallback<Long> future = new FutureCallback<Long>();
+            fd.write(0, buffer, future);
+            long count = future.get(1, TimeUnit.SECONDS);
+            
+            assertEquals(count, buffer.length());
+        } finally {
+            fd.dispose();
+        }
+
+        assertEquals(expected, readFile(file));
+    }
+    
+}

Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/TestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/TestSupport.java?rev=882212&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/TestSupport.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/TestSupport.java Thu Nov 19 17:26:41 2009
@@ -0,0 +1,54 @@
+package org.apache.activemq.syscall;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+public class TestSupport {
+    public static String generateString(int size) {
+        StringBuffer sb = new StringBuffer();
+        for( int i=0; i < size; i++ ) {
+            sb.append((char)('a'+(i%26)));
+        }
+        String expected = sb.toString();
+        return expected;
+    }
+    
+    public static File dataFile(String name) {
+        name.replace('/', File.separatorChar);
+        String basedir = System.getProperty("basedir", "."); 
+        File f = new File(basedir);
+        f = new File(f, "target");
+        f = new File(f, "test-data");
+        f.mkdirs();
+        return new File(f, name);
+    }
+    
+    static public String readFile(File file) throws FileNotFoundException, IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        FileInputStream is = new FileInputStream(file);
+        try {
+            int c=0;
+            while( (c=is.read())>=0 ) {
+                baos.write(c);
+            }
+        } finally {
+            is.close();
+        }
+        String actual = new String(baos.toByteArray());
+        return actual;
+    }
+
+    static public void writeFile(File file, String content) throws FileNotFoundException, IOException {
+        FileOutputStream os = new FileOutputStream(file);
+        try {
+            os.write(content.getBytes());
+        } finally {
+            os.close();
+        }
+    }
+    
+}

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java?rev=882212&r1=882211&r2=882212&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java Thu Nov 19 17:26:41 2009
@@ -13,25 +13,21 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */package org.apache.activemq.syscall.jni;
+ */
+package org.apache.activemq.syscall.jni;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 
 import org.apache.activemq.syscall.NativeAllocation;
-import org.apache.activemq.syscall.jni.AIO;
 import org.apache.activemq.syscall.jni.AIO.aiocb;
 import org.junit.Test;
 
+import static org.apache.activemq.syscall.NativeAllocation.*;
+import static org.apache.activemq.syscall.TestSupport.*;
 import static org.apache.activemq.syscall.jni.AIO.*;
 import static org.apache.activemq.syscall.jni.CLibrary.*;
 import static org.apache.activemq.syscall.jni.IO.*;
-
-import static org.apache.activemq.syscall.NativeAllocation.*;
 import static org.hamcrest.CoreMatchers.*;
 import static org.junit.Assert.*;
 import static org.junit.Assume.*;
@@ -47,16 +43,9 @@
     public void write() throws IOException, InterruptedException {
     	assumeThat(AIO.SUPPORTED, is(true));
     	 
-        File file = new File("target/test-data/test.data");
-        file.getParentFile().mkdirs();
+        File file = dataFile(AIOTest.class.getName()+".write.data");
 
-        // Setup a buffer holds the data that we will be writing..
-        StringBuffer sb = new StringBuffer();
-        for( int i=0; i < 1024*4; i++ ) {
-            sb.append((char)('a'+(i%26)));
-        }
-                
-        String expected = sb.toString();
+        String expected = generateString(1024*4);
         NativeAllocation writeBuffer = allocate(expected);
 
         long aiocbp = malloc(aiocb.SIZEOF);
@@ -64,8 +53,9 @@
 
         try {
             // open the file...
+            int oflags = O_NONBLOCK | O_CREAT | O_TRUNC| O_RDWR;
             int mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
-            int fd = open(file.getCanonicalPath(), O_NONBLOCK | O_CREAT | O_TRUNC| O_RDWR, mode);
+            int fd = open(file.getCanonicalPath(), oflags, mode);
             checkrc(fd);
             
             // Create a control block..
@@ -109,35 +99,10 @@
             }
         }
         
-        // Read the file in and verify the contents is what we expect 
-        String actual = loadContent(file);
-        assertEquals(expected, actual);
+        assertEquals(expected, readFile(file));
     }
 
-    private String loadContent(File file) throws FileNotFoundException, IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        FileInputStream is = new FileInputStream(file);
-        try {
-            int c=0;
-            while( (c=is.read())>=0 ) {
-                baos.write(c);
-            }
-        } finally {
-            is.close();
-        }
-        String actual = new String(baos.toByteArray());
-        return actual;
-    }
 
-    private void storeContent(File file, String content) throws FileNotFoundException, IOException {
-        FileOutputStream os = new FileOutputStream(file);
-        try {
-            os.write(content.getBytes());
-        } finally {
-            os.close();
-        }
-    }
-    
     private void checkrc(int rc) throws IOException {
         if( rc==-1 ) {
             throw new IOException("IO failure: "+string(strerror(errno())));