You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/11/24 02:49:49 UTC

svn commit: r883569 - in /activemq/sandbox/activemq-apollo/activemq-syscall/src: main/java/org/apache/activemq/syscall/jni/ main/native-package/ main/native-package/src/ test/java/org/apache/activemq/syscall/jni/

Author: chirino
Date: Tue Nov 24 01:49:33 2009
New Revision: 883569

URL: http://svn.apache.org/viewvc?rev=883569&view=rev
Log:
added a LibAIO test which helped flush out the implementation.


Added:
    activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/LibAIOTest.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/PosixAIOTest.java
Removed:
    activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java
Modified:
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/LibAIO.java
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/configure.ac
    activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/src/activemq-syscall.h
    activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/IOTest.java

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/LibAIO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/LibAIO.java?rev=883569&r1=883568&r2=883569&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/LibAIO.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/LibAIO.java Tue Nov 24 01:49:33 2009
@@ -23,6 +23,7 @@
 import org.fusesource.hawtjni.runtime.JniField;
 import org.fusesource.hawtjni.runtime.JniMethod;
 
+import static org.fusesource.hawtjni.runtime.ArgFlag.*;
 import static org.fusesource.hawtjni.runtime.MethodFlag.*;
 
 /**
@@ -45,7 +46,7 @@
     
     
     @JniClass(flags={ClassFlag.STRUCT}, conditional="defined(HAVE_LIBAIO_H)")
-    public static final class iocb {
+    public static class iocb {
         static {
             CLibrary.LIBRARY.load();
             init();
@@ -55,20 +56,62 @@
 
         @JniField(flags={FieldFlag.CONSTANT}, accessor="sizeof(struct iocb)")
         public static int SIZEOF;
+
+        @JniField(flags={FieldFlag.CONSTANT}, accessor="offsetof(struct iocb, u)")
+        public static int OFFSETOF_u;
         
         @JniField(cast="void *")
-        long data;
+        public long data;
         @JniField(cast="unsigned")
-        int key;
+        public int key;
         @JniField(cast="short")
-        short aio_lio_opcode; 
+        public short aio_lio_opcode; 
         @JniField(cast="short")
-        short aio_reqprio;
+        public short aio_reqprio;
         @JniField(cast="int")
-        int aio_fildes;
+        public int aio_fildes;
+        
+        public static final native void memmove (
+                @JniArg(cast="void *", flags={NO_IN, CRITICAL}) iocb dest, 
+                @JniArg(cast="const void *") long src, 
+                @JniArg(cast="size_t") long size);
+        
+        public static final native void memmove (
+                @JniArg(cast="void *") long dest, 
+                @JniArg(cast="const void *", flags={NO_OUT, CRITICAL}) iocb src, 
+                @JniArg(cast="size_t") long size);        
     }
 
     @JniClass(flags={ClassFlag.STRUCT}, conditional="defined(HAVE_LIBAIO_H)")
+    public static final class io_iocb_common {
+        
+        static {
+            CLibrary.LIBRARY.load();
+            init();
+        }
+        @JniMethod(flags={CONSTANT_INITIALIZER})
+        private static final native void init();
+
+        @JniField(flags={FieldFlag.CONSTANT}, accessor="sizeof(struct io_iocb_common)")
+        public static int SIZEOF;
+        
+        @JniField(cast="void *")
+        long buf;
+        long nbytes;
+        long offset;
+        
+        public static final native void memmove (
+                @JniArg(cast="void *", flags={NO_IN, CRITICAL}) io_iocb_common dest, 
+                @JniArg(cast="const void *") long src, 
+                @JniArg(cast="size_t") long size);
+        
+        public static final native void memmove (
+                @JniArg(cast="void *") long dest, 
+                @JniArg(cast="const void *", flags={NO_OUT, CRITICAL}) io_iocb_common src, 
+                @JniArg(cast="size_t") long size);        
+    }
+    
+    @JniClass(flags={ClassFlag.STRUCT}, conditional="defined(HAVE_LIBAIO_H)")
     public static final class io_event {
         static {
             CLibrary.LIBRARY.load();
@@ -81,11 +124,22 @@
         public static int SIZEOF;
 
         @JniField(cast="void *")
-        long data;
+        public long data;
         @JniField(cast="struct iocb *")
-        long obj;
-        long res;
-        long res2;
+        public long obj;
+        public long res;
+        public long res2;
+
+        public static final native void memmove (
+                @JniArg(cast="void *", flags={NO_IN, CRITICAL}) io_event dest, 
+                @JniArg(cast="const void *") long src, 
+                @JniArg(cast="size_t") long size);
+        
+        public static final native void memmove (
+                @JniArg(cast="void *") long dest, 
+                @JniArg(cast="const void *", flags={NO_OUT, CRITICAL}) io_event src, 
+                @JniArg(cast="size_t") long size);        
+    
     };
 
     ///////////////////////////////////////////////////////////////////
@@ -127,24 +181,24 @@
     public static final native int io_queue_release(
             @JniArg(cast="struct io_context *") long ctx);
     public static final native int io_queue_run(
-            @JniArg(cast="struct io_context **") long ctx);
+            @JniArg(cast="struct io_context *") long ctx);
 
     public static final native void io_set_callback(
             @JniArg(cast="struct iocb *")long iocb, 
-            @JniArg(cast="io_callback_t")long  cb);
+            @JniArg(cast="void *")long  cb);
 
     public static final native void io_prep_pread(
             @JniArg(cast="struct iocb *")long iocb, 
             int fd, 
             @JniArg(cast="void *") long buf, 
-            @JniArg(cast="site_t") long count, 
+            @JniArg(cast="size_t") long count, 
             long offset);
 
     public static final native void io_prep_pwrite(
             @JniArg(cast="struct iocb *")long iocb, 
             int fd, 
             @JniArg(cast="void *") long buf, 
-            @JniArg(cast="site_t") long count, 
+            @JniArg(cast="size_t") long count, 
             long offset);
 
     public static final native void io_prep_preadv(
@@ -164,7 +218,7 @@
     public static final native int io_poll(
             @JniArg(cast="struct io_context *")long ctx, 
             @JniArg(cast="struct iocb *")long iocb, 
-            @JniArg(cast="io_callback_t")long  cb, 
+            @JniArg(cast="void *")long  cb, 
             int fd, 
             int events);
 
@@ -175,7 +229,7 @@
     public static final native int io_fsync(
             @JniArg(cast="struct io_context *")long ctx, 
             @JniArg(cast="struct iocb *")long iocb, 
-            @JniArg(cast="io_callback_t")long  cb, 
+            @JniArg(cast="void *")long  cb, 
             int fd);
 
     public static final native void io_prep_fdsync(
@@ -185,7 +239,7 @@
     public static final native int io_fdsync(
             @JniArg(cast="struct io_context *")long ctx, 
             @JniArg(cast="struct iocb *")long iocb, 
-            @JniArg(cast="io_callback_t")long  cb, 
+            @JniArg(cast="void *")long  cb, 
             int fd);
 
     public static final native void io_set_eventfd(

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/configure.ac
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/configure.ac?rev=883569&r1=883568&r2=883569&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/configure.ac (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/configure.ac Tue Nov 24 01:49:33 2009
@@ -54,8 +54,8 @@
 
 # Look for the libaio api..
 AC_CHECK_HEADER([libaio.h],[
-  AC_CHECK_LIB([libaio], [io_queue_run],[
-    LDFLAGS="$LDFLAGS -llibaio"
+  AC_CHECK_LIB([aio], [io_queue_run],[
+    LDFLAGS="$LDFLAGS -laio"
     AC_DEFINE([HAVE_LIBAIO_H], [1], [Define to 1 if you have the <libaio.h> header file.])
     break
   ])  
@@ -70,6 +70,7 @@
 AC_CHECK_HEADER([sys/stat.h],[AC_DEFINE([HAVE_SYS_STAT_H], [1], [Define to 1 if you have the <sys/stat.h> header file.])])
 AC_CHECK_HEADER([sys/types.h],[AC_DEFINE([HAVE_SYS_TYPES_H], [1], [Define to 1 if you have the <sys/types.h> header file.])])
 AC_CHECK_HEADER([sys/uio.h],[AC_DEFINE([HAVE_SYS_UIO_H], [1], [Define to 1 if you have the <sys/uio.h> header file.])])
+AC_CHECK_HEADER([stddef.h],[AC_DEFINE([HAVE_STDDEF_H], [1], [Define to 1 if you have the <stddef.h> header file.])])
 
 CFLAGS="$CFLAGS $JNI_EXTRA_CFLAGS"
 LDFLAGS="$LDFLAGS $JNI_EXTRA_LDFLAGS -release $PACKAGE_VERSION"

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/src/activemq-syscall.h
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/src/activemq-syscall.h?rev=883569&r1=883568&r2=883569&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/src/activemq-syscall.h (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/native-package/src/activemq-syscall.h Tue Nov 24 01:49:33 2009
@@ -46,6 +46,10 @@
   #include <string.h>
 #endif
 
+#ifdef HAVE_LIBAIO_H
+  #include <libaio.h>
+#endif
+
 #ifdef HAVE_AIO_H
   #include <aio.h>
 #endif
@@ -62,6 +66,10 @@
   #include <io.h>
 #endif
 
+#ifdef HAVE_STDDEF_H
+  #include <stddef.h>
+#endif
+
 
 #include <fcntl.h>
 

Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/IOTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/IOTest.java?rev=883569&r1=883568&r2=883569&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/IOTest.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/IOTest.java Tue Nov 24 01:49:33 2009
@@ -17,7 +17,7 @@
     @Test
     public void fcntl_GETFL() throws IOException, InterruptedException {
         assumeThat(HAVE_FCNTL_FUNCTION, is(true));
-        File file = dataFile(AIOTest.class.getName() + ".direct.data");
+        File file = dataFile(IOTest.class.getName() + ".direct.data");
         int fd = 0;
         try {
             // open the file...

Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/LibAIOTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/LibAIOTest.java?rev=883569&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/LibAIOTest.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/LibAIOTest.java Tue Nov 24 01:49:33 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.syscall.jni;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activemq.syscall.NativeAllocation;
+import org.apache.activemq.syscall.jni.LibAIO.io_event;
+import org.apache.activemq.syscall.jni.LibAIO.iocb;
+import org.junit.Test;
+
+import static org.apache.activemq.syscall.NativeAllocation.*;
+import static org.apache.activemq.syscall.TestSupport.*;
+import static org.apache.activemq.syscall.jni.CLibrary.*;
+import static org.apache.activemq.syscall.jni.IO.*;
+import static org.apache.activemq.syscall.jni.LibAIO.*;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ *
+ */
+public class LibAIOTest {
+    
+    @Test
+    public void write() throws IOException, InterruptedException {
+    	assumeThat(LibAIO.SUPPORTED, is(true));
+    	 
+        File file = dataFile(LibAIOTest.class.getName()+".write.data");
+
+        String expected = generateString(1024*4);
+        NativeAllocation buffer = allocate(expected);
+
+        long iocbp = malloc(iocb.SIZEOF);
+        long io_eventp = malloc(io_event.SIZEOF);
+
+        long ctx_ida[] = new long[1];
+        int rc = io_setup(10, ctx_ida);
+        assertEquals(0, rc);
+        
+        long ctx_id = ctx_ida[0];
+
+        try {
+            // open the file...
+            int oflags = O_NONBLOCK | O_CREAT | O_TRUNC| O_RDWR;
+            int mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
+            int fd = open(file.getCanonicalPath(), oflags, mode);
+            checkrc(fd);
+            
+            // enqueue the async write..
+            io_prep_pwrite(iocbp, fd, buffer.pointer(), buffer.length(), 0);
+            rc = io_submit(ctx_id, 1, new long[]{iocbp});
+            assertEquals(1, rc);
+
+            // Wait for the IO to complete.
+            rc = io_getevents(ctx_id, 1, 1, io_eventp, NULL);
+            assertEquals(1, rc);
+            
+            // Get the event..
+            // The full buffer should have been written.
+            io_event event = new io_event();                        
+            io_event.memmove(event, io_eventp, io_event.SIZEOF);
+            
+            assertEquals(0, event.res2);
+            assertEquals(buffer.length(), event.res);
+            assertEquals(event.obj, iocbp);
+            
+            checkrc(close(fd));
+            
+        } finally {
+            // Lets free up allocated memory..
+            io_destroy(ctx_id);            
+            buffer.free();
+            if( iocbp!=NULL ) {
+                free(iocbp);
+            }
+            if( io_eventp!=NULL ) { 
+                free(io_eventp);
+            }
+                
+        }
+        
+        assertEquals(expected, readFile(file));
+    }
+
+
+    private void checkrc(int rc) {
+        if( rc==-1 ) {
+            fail("IO failure: "+string(strerror(errno())));
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/PosixAIOTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/PosixAIOTest.java?rev=883569&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/PosixAIOTest.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/PosixAIOTest.java Tue Nov 24 01:49:33 2009
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.syscall.jni;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activemq.syscall.NativeAllocation;
+import org.apache.activemq.syscall.jni.PosixAIO.aiocb;
+import org.junit.Test;
+
+import static org.apache.activemq.syscall.NativeAllocation.*;
+import static org.apache.activemq.syscall.TestSupport.*;
+import static org.apache.activemq.syscall.jni.PosixAIO.*;
+import static org.apache.activemq.syscall.jni.CLibrary.*;
+import static org.apache.activemq.syscall.jni.IO.*;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ *
+ */
+public class PosixAIOTest {
+    
+    @Test
+    public void write() throws IOException, InterruptedException {
+    	assumeThat(PosixAIO.SUPPORTED, is(true));
+    	 
+        File file = dataFile(PosixAIOTest.class.getName()+".write.data");
+
+        String expected = generateString(1024*4);
+        NativeAllocation buffer = allocate(expected);
+
+        long aiocbp = malloc(aiocb.SIZEOF);
+        System.out.println("Allocated cb of size: "+aiocb.SIZEOF);
+
+        try {
+            // open the file...
+            int oflags = O_NONBLOCK | O_CREAT | O_TRUNC| O_RDWR;
+            int mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
+            int fd = open(file.getCanonicalPath(), oflags, mode);
+            checkrc(fd);
+            
+            // Create a control block..
+            // The where:
+            aiocb cb = new aiocb();
+            cb.aio_fildes = fd;
+            cb.aio_offset = 0;
+            // The what:
+            cb.aio_buf = buffer.pointer();        
+            cb.aio_nbytes = buffer.length();
+            
+            // Move the struct into the c heap.
+            aiocb.memmove(aiocbp, cb, aiocb.SIZEOF);
+
+            // enqueue the async write..
+            checkrc(aio_write(aiocbp));
+            
+            long blocks[] = new long[]{aiocbp};
+            
+            // Wait for the IO to complete.
+            long timeout = NULL; // To suspend forever.
+            checkrc(aio_suspend(blocks, blocks.length, timeout));
+            
+            // Check to see if it completed.. it should 
+            // since we previously suspended.
+            int rc = aio_error(aiocbp);
+            checkrc(rc);
+            assertEquals(0, rc);
+
+            // The full buffer should have been written.
+            long count = aio_return(aiocbp);
+            assertEquals(count, buffer.length());
+            
+            checkrc(close(fd));
+            
+        } finally {
+            // Lets free up allocated memory..
+            buffer.free();
+            if( aiocbp!=NULL ) {
+                free(aiocbp);
+            }
+        }
+        
+        assertEquals(expected, readFile(file));
+    }
+
+
+    private void checkrc(int rc) {
+        if( rc==-1 ) {
+            fail("IO failure: "+string(strerror(errno())));
+        }
+    }
+
+}