You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/11/19 21:37:59 UTC

svn commit: r882286 - in /activemq/sandbox/activemq-apollo/activemq-syscall/src: main/java/org/apache/activemq/syscall/ main/java/org/apache/activemq/syscall/jni/ main/native-package/ main/native-package/src/ test/java/org/apache/activemq/syscall/ test...

Author: chirino
Date: Thu Nov 19 20:37:59 2009
New Revision: 882286

URL: http://svn.apache.org/viewvc?rev=882286&view=rev
Log:
Beefing up the io apis that the FileDescriptor exposes.


Added:
    activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/IOTest.java
Modified:
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/NativeAllocation.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/CLibrary.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/IO.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/configure.ac
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/src/activemq-syscall.h
    activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java?rev=882286&r1=882285&r2=882286&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java Thu Nov 19 20:37:59 2009
@@ -18,94 +18,280 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 
+import org.apache.activemq.syscall.jni.AIO;
 import org.apache.activemq.syscall.jni.IO;
 import org.apache.activemq.syscall.jni.AIO.aiocb;
+import org.apache.activemq.syscall.jni.IO.iovec;
 
 import static org.apache.activemq.syscall.jni.AIO.*;
 import static org.apache.activemq.syscall.jni.CLibrary.*;
+import static org.apache.activemq.syscall.jni.IO.*;
+import static org.apache.activemq.syscall.jni.IO.iovec.*;
 
 /**
+ * Used to access a file descriptor.
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class FileDescriptor {
 
     private final int fd;
+
     boolean opened;
+
     private AioPollAgent aioPollAgent;
 
     public FileDescriptor(int fd) {
         this.fd = fd;
     }
 
+    public static FileDescriptor open(String path, int oflags) throws IOException {
+        return open(path, oflags, 0);
+    }
+
+    public static FileDescriptor open(File file, int oflags) throws IOException {
+        return open(file.getPath(), oflags, 0);
+    }
+
     public static FileDescriptor open(File file, int oflags, int mode) throws IOException {
-        return open(file.getPath(), oflags, mode); 
+        return open(file.getPath(), oflags, mode);
     }
-    
+
     public static FileDescriptor open(String path, int oflags, int mode) throws IOException {
         int fd = IO.open(path, oflags, mode);
-        if( fd== -1 ) {
-            throw new IOException(string(strerror(errno())));
+        if (fd == -1) {
+            throw error();
         }
         FileDescriptor rc = new FileDescriptor(fd);
         rc.opened = true;
         return rc;
     }
-    
+
     public int dispose() {
-        if(closeCheck()) {
+        if (closeCheck()) {
             return IO.close(fd);
         }
         return 0;
     }
 
     public void close() throws IOException {
-        if( dispose() == -1 ) {
-            throw new IOException(string(strerror(errno())));
+        if (dispose() == -1) {
+            throw error();
         }
     }
 
     private boolean closeCheck() {
-        if( opened ) {
-            opened=false;
+        if (opened) {
+            opened = false;
             return true;
         }
         return false;
     }
-    
+
     int getFD() {
         return fd;
     }
 
+    public long write(NativeAllocation buffer) throws IOException {
+        long rc = IO.write(fd, buffer.pointer(), buffer.length());
+        if (rc == -1) {
+            throw error();
+        }
+        return rc;
+    }
+
+    public long read(NativeAllocation buffer) throws IOException {
+        long rc = IO.write(fd, buffer.pointer(), buffer.length());
+        if (rc == -1) {
+            throw error();
+        }
+        return rc;
+    }
+
+    public long write(long offset, NativeAllocation buffer) throws IOException {
+        long rc = IO.pwrite(fd, buffer.pointer(), buffer.length(), offset);
+        if (rc == -1) {
+            throw error();
+        }
+        return rc;
+    }
+
+    public long read(long offset, NativeAllocation buffer) throws IOException {
+        long rc = IO.pwrite(fd, buffer.pointer(), buffer.length(), offset);
+        if (rc == -1) {
+            throw error();
+        }
+        return rc;
+    }
+
+    public long write(Collection<NativeAllocation> buffers) throws IOException {
+        long iovecp = malloc(iovec.SIZEOF * buffers.size());
+        if (iovecp == NULL) {
+            throw new OutOfMemoryError();
+        }
+        try {
+            long cur = iovecp;
+            iovec v = new iovec();
+            for (NativeAllocation buffer : buffers) {
+                v.iov_base = buffer.pointer();
+                v.iov_len = buffer.length();
+                memmove(cur, v, iovec.SIZEOF);
+                cur = iovec_add(cur, 1);
+            }
+            long rc = IO.writev(fd, iovecp, buffers.size());
+            if (rc == -1) {
+                throw error();
+            }
+            return rc;
+        } finally {
+            free(iovecp);
+        }
+    }
+
+    public long read(Collection<NativeAllocation> buffers) throws IOException {
+        long iovecp = malloc(iovec.SIZEOF * buffers.size());
+        if (iovecp == NULL) {
+            throw new OutOfMemoryError();
+        }
+        try {
+            long cur = iovecp;
+            iovec v = new iovec();
+            for (NativeAllocation buffer : buffers) {
+                v.iov_base = buffer.pointer();
+                v.iov_len = buffer.length();
+                memmove(cur, v, iovec.SIZEOF);
+                cur = iovec_add(cur, 1);
+            }
+            long rc = IO.readv(fd, iovecp, buffers.size());
+            if (rc == -1) {
+                throw error();
+            }
+            return rc;
+        } finally {
+            free(iovecp);
+        }
+    }
+
+    public boolean isAsyncIOSupported() {
+        return AIO.SUPPORTED;
+    }
+
     /**
-     * does an async write, the callback gets executed once the write completes.
+     * Performs a non blocking write, the callback gets executed once the write
+     * completes. The buffer should not be read until the operation completes.
      * 
      * @param buffer
      * @param callback
      */
     public void write(long offset, NativeAllocation buffer, Callback<Long> callback) throws IOException {
-        
+        long aiocbp = block(offset, buffer);
+        int rc = aio_write(aiocbp);
+        if (rc == -1) {
+            free(aiocbp);
+            throw error();
+        }
+        agent().watch(aiocbp, callback);
+    }
+
+    static private IOException error() {
+        return new IOException(string(strerror(errno())));
+    }
+
+    /**
+     * Performs a non blocking read, the callback gets executed once the read
+     * completes. The buffer should not be modified until the operation
+     * completes.
+     * 
+     * @param buffer
+     * @param callback
+     */
+    public void read(long offset, NativeAllocation buffer, Callback<Long> callback) throws IOException {
+        long aiocbp = block(offset, buffer);
+        int rc = aio_read(aiocbp);
+        if (rc == -1) {
+            free(aiocbp);
+            throw error();
+        }
+        agent().watch(aiocbp, callback);
+    }
+
+    public void sync() {
+    }
+
+    public boolean isfullSyncSupported() {
+        return F_FULLFSYNC != 0;
+    }
+
+    public void fullSync() throws IOException, UnsupportedOperationException {
+        if (!isfullSyncSupported()) {
+            throw new UnsupportedOperationException();
+        }
+        int rc = fcntl(fd, F_FULLFSYNC);
+        if( rc == -1 ) {
+            throw error();
+        }
+    }
+
+    public boolean isDirectIOSupported() {
+        if (!HAVE_FCNTL_FUNCTION)
+            return false;
+        if (F_NOCACHE != 0)
+            return true;
+        if (O_DIRECT != 0)
+            return true;
+        return false;
+    }
+
+    public void enableDirectIO() throws IOException, UnsupportedOperationException {
+        if (F_NOCACHE != 0) {
+            int rc = fcntl(fd, F_NOCACHE);
+            if( rc == -1 ) {
+                throw error();
+            }
+        } else if (O_DIRECT != 0) {
+            int rc = fcntl(fd, F_GETFL);
+            if( rc == -1 ) {
+                throw error();
+            }
+            rc = fcntl(fd, F_SETFL, rc|O_DIRECT );
+            if( rc == -1 ) {
+                throw error();
+            }
+            
+        } else {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * allocates an initialized aiocb structure on the heap using the given
+     * parameters.
+     */
+    private long block(long offset, NativeAllocation buffer) throws OutOfMemoryError {
         aiocb cb = new aiocb();
         cb.aio_fildes = fd;
         cb.aio_offset = offset;
-        cb.aio_buf = buffer.pointer();        
+        cb.aio_buf = buffer.pointer();
         cb.aio_nbytes = buffer.length();
 
         long aiocbp = malloc(aiocb.SIZEOF);
-        if( aiocbp==NULL ) {
+        if (aiocbp == NULL) {
             throw new OutOfMemoryError();
         }
         aiocb.memmove(aiocbp, cb, aiocb.SIZEOF);
-        aio_write(aiocbp);
+        return aiocbp;
+    }
 
-        AioPollAgent agent = getAioPollAgent();
-        agent.watch(aiocbp, callback);
-        return;
-    }
-    
-    private AioPollAgent getAioPollAgent() {
-        if( aioPollAgent==null ) {
+    /**
+     * gets the poll agent that will be used to watch of completion of AIO
+     * requets.
+     * 
+     * @return
+     */
+    private AioPollAgent agent() {
+        if (aioPollAgent == null) {
             aioPollAgent = AioPollAgent.getMainPollAgent();
         }
         return aioPollAgent;
@@ -115,18 +301,4 @@
         this.aioPollAgent = aioPollAgent;
     }
 
-    /**
-     * does an async read, the callback gets executed once the read completes.
-     * 
-     * @param buffer
-     * @param callback
-     */
-    public void read(long offset, NativeAllocation buffer, Callback<Long> callback) throws IOException {
-        
-
-        return;
-    }
-    
-    
-    
 }

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/NativeAllocation.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/NativeAllocation.java?rev=882286&r1=882285&r2=882286&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/NativeAllocation.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/NativeAllocation.java Thu Nov 19 20:37:59 2009
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.syscall;
 
+import java.io.UnsupportedEncodingException;
+
 import org.apache.activemq.syscall.jni.CLibrary;
 
 import static org.apache.activemq.syscall.jni.CLibrary.*;
@@ -32,7 +34,7 @@
 
     final private long pointer;
     final private long length;
-    boolean allocated;
+    volatile byte allocated;
 
     public NativeAllocation(long pointer, long length) {
         if( pointer==NULL ) {
@@ -42,10 +44,6 @@
         this.length = length;
     }
     
-    static public NativeAllocation allocate(String value) {
-        return allocate(value.getBytes());
-    }
-
     private static NativeAllocation allocate(byte[] value) {
         int size = value.length;
         NativeAllocation rc = allocate(size);
@@ -55,24 +53,47 @@
     
     static public NativeAllocation allocate(long size) {
         NativeAllocation rc = new NativeAllocation(calloc(size,1), size);
-        rc.allocated = true;
+        rc.allocated = 1;
         return rc;
     }        
     
     public void free() {
-        if( freeCheck() ) {
+        // This should be thread safe as long as the JVM continues
+        // to do the unary decrement on a byte is atomic operation
+        if( allocated==1 && (--allocated)==0 ) {
             CLibrary.free(pointer);
         }
     }
-    
-    private boolean freeCheck() {
-        if( allocated ) {
-            allocated=false;
-            return true;
+
+    /**
+     * This finalize is here as a fail safe to fee up memory that was not freed
+     * manually. 
+     * 
+     * @see java.lang.Object#finalize()
+     */
+    protected void finalize() throws Throwable {
+        if( allocated==1 && (--allocated)==0 ) {
+            assert warnAboutALeak();
+            CLibrary.free(pointer);
         }
-        return false;
     }
     
+    private boolean warnAboutALeak() {
+        System.err.println(String.format("Warnning: memory leak avoided, a NativeAllocation was not freed: %d", pointer));
+        return true;
+    }
+
+    public NativeAllocation view(long off, long len) {
+        assert len >=0;
+        assert off >=0;
+        assert off+len <= length;
+        long ptr = pointer;
+        if( off > 0 ) {
+            ptr = void_pointer_add(ptr, off);
+        }
+        return new NativeAllocation(ptr, len);
+    }    
+
     public long pointer() {
         return pointer;
     }
@@ -85,4 +106,29 @@
         return length;
     }
 
+    public byte[] bytes() {
+        if( length > Integer.MAX_VALUE ) {
+            throw new IndexOutOfBoundsException("The native allocation is to large to convert to a java byte array");
+        }
+        byte rc[] = new byte[(int) length];
+        memmove(rc, pointer, length);
+        return rc;
+    }
+    
+    static public NativeAllocation allocate(String value) {
+        return allocate(value.getBytes());
+    }
+
+    static public NativeAllocation allocate(String value, String encoding) throws UnsupportedEncodingException {
+        return allocate(value.getBytes(encoding));
+    }
+
+    public String string() {
+        return new String(bytes());
+    }
+    
+    public String string(String encoding) throws UnsupportedEncodingException {
+        return new String(bytes(), encoding);
+    }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java?rev=882286&r1=882285&r2=882286&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java Thu Nov 19 20:37:59 2009
@@ -24,7 +24,6 @@
 import org.fusesource.hawtjni.runtime.JniMethod;
 
 import static org.fusesource.hawtjni.runtime.ArgFlag.*;
-
 import static org.fusesource.hawtjni.runtime.MethodFlag.*;
 
 /**

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/CLibrary.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/CLibrary.java?rev=882286&r1=882285&r2=882286&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/CLibrary.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/CLibrary.java Thu Nov 19 20:37:59 2009
@@ -38,7 +38,7 @@
     
     final public static long NULL = 0;
     
-    @JniMethod(flags={MethodFlag.CONSTANT})
+    @JniMethod(flags={MethodFlag.CONSTANT_GETTER})
     public static final native int errno();
 
     @JniMethod(cast="char *")

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/IO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/IO.java?rev=882286&r1=882285&r2=882286&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/IO.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/IO.java Thu Nov 19 20:37:59 2009
@@ -16,14 +16,17 @@
  */
 package org.apache.activemq.syscall.jni;
 
+import org.fusesource.hawtjni.runtime.ClassFlag;
+import org.fusesource.hawtjni.runtime.FieldFlag;
+import org.fusesource.hawtjni.runtime.JniArg;
 import org.fusesource.hawtjni.runtime.JniClass;
 import org.fusesource.hawtjni.runtime.JniField;
 import org.fusesource.hawtjni.runtime.JniMethod;
 
+import static org.fusesource.hawtjni.runtime.ArgFlag.*;
+import static org.fusesource.hawtjni.runtime.FieldFlag.*;
 import static org.fusesource.hawtjni.runtime.MethodFlag.*;
 
-import static org.fusesource.hawtjni.runtime.FieldFlag.CONSTANT;
-
 /**
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -38,6 +41,11 @@
         init();
     }
 
+    //////////////////////////////////////////////////////////////////
+    //
+    // Open mode constants.
+    //
+    //////////////////////////////////////////////////////////////////
     @JniField(flags={CONSTANT})
     public static int O_RDONLY;
     @JniField(flags={CONSTANT})
@@ -68,6 +76,21 @@
     @JniField(flags={CONSTANT}, conditional="#ifdef O_EVTONLY")
     public static int O_EVTONLY;
     
+    @JniField(flags={CONSTANT}, conditional="#ifdef O_DIRECT")
+    public static int O_DIRECT;
+    @JniField(flags={CONSTANT}, conditional="#ifdef O_CLOEXEC")
+    public static int O_CLOEXEC;
+    @JniField(flags={CONSTANT}, conditional="#ifdef O_DIRECTORY")
+    public static int O_DIRECTORY;
+    @JniField(flags={CONSTANT}, conditional="#ifdef O_LARGEFILE")
+    public static int O_LARGEFILE;
+    @JniField(flags={CONSTANT}, conditional="#ifdef O_NOATIME")
+    public static int O_NOATIME;
+    @JniField(flags={CONSTANT}, conditional="#ifdef O_NOCTTY")
+    public static int O_NOCTTY;
+    @JniField(flags={CONSTANT}, conditional="#ifdef O_SYNC")
+    public static int O_SYNC;
+    
     // Mode Constants
     @JniField(flags={CONSTANT}, conditional="#ifdef S_IRWXU")
     public static int S_IRWXU;
@@ -167,7 +190,10 @@
      * </pre></code>
      */
     public static final native int close(int fd);
-
+    
+    @JniField(flags={FieldFlag.CONSTANT}, conditional="#ifdef HAVE_FCNTL_FUNCTION", accessor="1")
+    public static boolean HAVE_FCNTL_FUNCTION;
+    
     /**
      * <code><pre>
      * int fcntl(int fd, int cmd, ...);
@@ -175,7 +201,89 @@
      */
     @JniMethod(conditional="#ifdef HAVE_FCNTL_FUNCTION")
     public static final native int fcntl(int fd, int cmd);
-        
+
+    /**
+     * <code><pre>
+     * int fcntl(int fd, int cmd, ...);
+     * </pre></code>
+     */
+    @JniMethod(conditional="#ifdef HAVE_FCNTL_FUNCTION")
+    public static final native int fcntl(int fd, int cmd, long arg);
+
     
+    @JniMethod(cast="size_t")
+    public static final native long write(
+            int fd, 
+            @JniArg(cast="const void *") long buffer, 
+            @JniArg(cast="size_t") long length);
+    
+    
+    @JniMethod(cast="size_t")
+    public static final native long pwrite(
+            int fd, 
+            @JniArg(cast="const void *") long buffer, 
+            @JniArg(cast="size_t") long length,
+            @JniArg(cast="size_t") long offset);
+
+    @JniMethod(cast="size_t")
+    public static final native long writev(
+            int fd, 
+            @JniArg(cast="const struct iovec *") long iov, 
+            int count);
+
+    @JniMethod(cast="size_t")
+    public static final native long read(
+            int fd, 
+            @JniArg(cast="void *") long buffer, 
+            @JniArg(cast="size_t") long length);
+
+    @JniMethod(cast="size_t")
+    public static final native long pread(
+            int fd, 
+            @JniArg(cast="void *") long buffer, 
+            @JniArg(cast="size_t") long length,
+            @JniArg(cast="size_t") long offset);
+
+    @JniMethod(cast="size_t")
+    public static final native long readv(
+            int fd, 
+            @JniArg(cast="const struct iovec *") long iov, 
+            int count);
+
+
+    @JniClass(flags={ClassFlag.STRUCT})
+    static public class iovec {
 
+        static {
+            CLibrary.LIBRARY.load();
+            init();
+        }
+
+        @JniMethod(flags={CONSTANT_INITIALIZER})
+        private static final native void init();
+
+        @JniField(flags={FieldFlag.CONSTANT}, accessor="sizeof(struct iovec)")
+        public static int SIZEOF;
+        
+        @JniField(cast="char *")
+        public long iov_base;  
+        @JniField(cast="size_t")
+        public long iov_len;
+        
+        public static final native void memmove (
+                @JniArg(cast="void *", flags={NO_IN, CRITICAL}) iovec dest, 
+                @JniArg(cast="const void *") long src, 
+                @JniArg(cast="size_t") long size);
+        
+        public static final native void memmove (
+                @JniArg(cast="void *") long dest, 
+                @JniArg(cast="const void *", flags={NO_OUT, CRITICAL}) iovec src, 
+                @JniArg(cast="size_t") long size);
+        
+        @JniMethod(cast="struct iovec *", accessor="add")
+        public static final native long iovec_add(
+                @JniArg(cast="struct iovec *") long ptr, 
+                long amount);        
+    }      
+    
 }

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/configure.ac
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/configure.ac?rev=882286&r1=882285&r2=882286&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/configure.ac (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/configure.ac Thu Nov 19 20:37:59 2009
@@ -57,6 +57,8 @@
               
 AC_CHECK_HEADER([sys/errno.h],[AC_DEFINE([HAVE_SYS_ERRNO_H], [1], [Define to 1 if you have the <sys/errno.h> header file.])])
 AC_CHECK_HEADER([sys/stat.h],[AC_DEFINE([HAVE_SYS_STAT_H], [1], [Define to 1 if you have the <sys/stat.h> header file.])])
+AC_CHECK_HEADER([sys/types.h],[AC_DEFINE([HAVE_SYS_TYPES_H], [1], [Define to 1 if you have the <sys/types.h> header file.])])
+AC_CHECK_HEADER([sys/uio.h],[AC_DEFINE([HAVE_SYS_UIO_H], [1], [Define to 1 if you have the <sys/uio.h> header file.])])
 
 CFLAGS="$CFLAGS $JNI_EXTRA_CFLAGS"
 LDFLAGS="$LDFLAGS $JNI_EXTRA_LDFLAGS -release $PACKAGE_VERSION"

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/src/activemq-syscall.h
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/src/activemq-syscall.h?rev=882286&r1=882285&r2=882286&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/src/activemq-syscall.h (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/src/activemq-syscall.h Thu Nov 19 20:37:59 2009
@@ -20,6 +20,14 @@
   #define fcntl _fcntl
 #endif
 
+#ifdef HAVE_SYS_TYPES_H
+  #include <sys/types.h>
+#endif
+
+#ifdef HAVE_SYS_UIO_H
+  #include <sys/uio.h>
+#endif
+
 #ifdef HAVE_UNISTD_H
   #include <unistd.h>
 #endif
@@ -44,6 +52,8 @@
   #include <sys/stat.h>
 #endif
 
+
+
 #include <fcntl.h>
 
 #define add(value1, value2) ((value1)+value2)

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java?rev=882286&r1=882285&r2=882286&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java Thu Nov 19 20:37:59 2009
@@ -47,7 +47,7 @@
 
         File file = dataFile(FileDescriptorTest.class.getName()+".writeWithACallback.data");
         
-        int oflags = O_NONBLOCK | O_CREAT | O_TRUNC | O_RDWR;
+        int oflags = O_NONBLOCK | O_CREAT | O_TRUNC | O_WRONLY;
         int mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
         FileDescriptor fd = FileDescriptor.open(file, oflags, mode);
         
@@ -60,8 +60,37 @@
         } finally {
             fd.dispose();
         }
-
+        
         assertEquals(expected, readFile(file));
+        buffer.free();
     }
     
+    
+    
+    @Test
+    public void readWithACallback() throws IOException, InterruptedException, ExecutionException, TimeoutException {
+        assumeThat(AIO.SUPPORTED, is(true));
+        
+        String expected = generateString(1024*4);
+        
+        File file = dataFile(FileDescriptorTest.class.getName()+".writeWithACallback.data");
+        writeFile(file, expected);
+
+        NativeAllocation buffer = allocate(expected.length());
+
+        int oflags = O_NONBLOCK | O_RDONLY;
+        FileDescriptor fd = FileDescriptor.open(file, oflags);
+        
+        try {
+            FutureCallback<Long> future = new FutureCallback<Long>();
+            fd.read(0, buffer, future);
+            long count = future.get(1, TimeUnit.SECONDS);
+            assertEquals(count, buffer.length());
+        } finally {
+            fd.dispose();
+        }
+
+        assertEquals(expected, buffer.string() );
+        buffer.free();
+    }    
 }

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java?rev=882286&r1=882285&r2=882286&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java Thu Nov 19 20:37:59 2009
@@ -46,7 +46,7 @@
         File file = dataFile(AIOTest.class.getName()+".write.data");
 
         String expected = generateString(1024*4);
-        NativeAllocation writeBuffer = allocate(expected);
+        NativeAllocation buffer = allocate(expected);
 
         long aiocbp = malloc(aiocb.SIZEOF);
         System.out.println("Allocated cb of size: "+aiocb.SIZEOF);
@@ -64,8 +64,8 @@
             cb.aio_fildes = fd;
             cb.aio_offset = 0;
             // The what:
-            cb.aio_buf = writeBuffer.pointer();        
-            cb.aio_nbytes = writeBuffer.length();
+            cb.aio_buf = buffer.pointer();        
+            cb.aio_nbytes = buffer.length();
             
             // Move the struct into the c heap.
             aiocb.memmove(aiocbp, cb, aiocb.SIZEOF);
@@ -87,13 +87,13 @@
 
             // The full buffer should have been written.
             long count = aio_return(aiocbp);
-            assertEquals(count, writeBuffer.length());
+            assertEquals(count, buffer.length());
             
             checkrc(close(fd));
             
         } finally {
             // Lets free up allocated memory..
-            writeBuffer.free();
+            buffer.free();
             if( aiocbp!=NULL ) {
                 free(aiocbp);
             }
@@ -103,9 +103,9 @@
     }
 
 
-    private void checkrc(int rc) throws IOException {
+    private void checkrc(int rc) {
         if( rc==-1 ) {
-            throw new IOException("IO failure: "+string(strerror(errno())));
+            fail("IO failure: "+string(strerror(errno())));
         }
     }
 

Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/IOTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/IOTest.java?rev=882286&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/IOTest.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/IOTest.java Thu Nov 19 20:37:59 2009
@@ -0,0 +1,50 @@
+package org.apache.activemq.syscall.jni;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import static org.apache.activemq.syscall.TestSupport.*;
+import static org.apache.activemq.syscall.jni.CLibrary.*;
+import static org.apache.activemq.syscall.jni.IO.*;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+
+public class IOTest {
+
+    @Test
+    public void fcntl_GETFL() throws IOException, InterruptedException {
+        assumeThat(HAVE_FCNTL_FUNCTION, is(true));
+        File file = dataFile(AIOTest.class.getName() + ".direct.data");
+        int fd = 0;
+        try {
+            // open the file...
+            int oflags = O_CREAT | O_WRONLY | O_APPEND;
+            int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
+            fd = open(file.getCanonicalPath(), oflags, mode);
+            checkrc(fd);
+
+            int rc = fcntl(fd, F_GETFL);
+            checkrc(rc);
+            assertTrue((rc & O_APPEND) != 0);
+
+        } finally {
+            checkrc(close(fd));
+        }
+    }
+
+    private void checkrc(int rc) {
+        if (rc == -1) {
+            fail("IO failure: " + string(strerror(errno())));
+        }
+    }
+
+    @Test
+    public void testFree() {
+        long ptr = malloc(100);
+        free(ptr);
+    }
+
+}