You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC

svn commit: r563982 [5/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java Wed Aug  8 11:56:59 2007
@@ -40,18 +40,19 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * A factory of the ActiveMQ InitialContext which contains {@link ConnectionFactory}
- * instances as well as a child context called <i>destinations</i> which contain all of the
- * current active destinations, in child context depending on the QoS such as
- * transient or durable and queue or topic.
- *
+ * A factory of the ActiveMQ InitialContext which contains
+ * {@link ConnectionFactory} instances as well as a child context called
+ * <i>destinations</i> which contain all of the current active destinations, in
+ * child context depending on the QoS such as transient or durable and queue or
+ * topic.
+ * 
  * @version $Revision: 1.2 $
  */
 public class ActiveMQInitialContextFactory implements InitialContextFactory {
 
-    private static final String[] defaultConnectionFactoryNames = {
-        "ConnectionFactory", "QueueConnectionFactory", "TopicConnectionFactory"
-    };
+    private static final String[] defaultConnectionFactoryNames = {"ConnectionFactory",
+                                                                   "QueueConnectionFactory",
+                                                                   "TopicConnectionFactory"};
 
     private String connectionPrefix = "connection.";
     private String queuePrefix = "queue.";
@@ -62,34 +63,29 @@
         Map data = new ConcurrentHashMap();
         String[] names = getConnectionFactoryNames(environment);
         for (int i = 0; i < names.length; i++) {
-            ActiveMQConnectionFactory factory =null;
+            ActiveMQConnectionFactory factory = null;
             String name = names[i];
 
-            try{
-             factory = createConnectionFactory(name, environment);
-            }catch(Exception e){
+            try {
+                factory = createConnectionFactory(name, environment);
+            } catch (Exception e) {
                 throw new NamingException("Invalid broker URL");
 
             }
-       /*     if( broker==null ) {
-                try {
-                    broker = factory.getEmbeddedBroker();
-                }
-                catch (JMSException e) {
-                    log.warn("Failed to get embedded broker", e);
-                }
-            }
-       */
-            data.put(name,factory);
+            /*
+             * if( broker==null ) { try { broker = factory.getEmbeddedBroker(); }
+             * catch (JMSException e) { log.warn("Failed to get embedded
+             * broker", e); } }
+             */
+            data.put(name, factory);
         }
 
         createQueues(data, environment);
         createTopics(data, environment);
         /*
-        if (broker != null) {
-            data.put("destinations", broker.getDestinationContext(environment));
-        }
-        */
+         * if (broker != null) { data.put("destinations",
+         * broker.getDestinationContext(environment)); }
+         */
         data.put("dynamicQueues", new LazyCreateContext() {
             private static final long serialVersionUID = 6503881346214855588L;
 
@@ -109,7 +105,7 @@
     }
 
     // Properties
-    //-------------------------------------------------------------------------
+    // -------------------------------------------------------------------------
     public String getTopicPrefix() {
         return topicPrefix;
     }
@@ -127,19 +123,20 @@
     }
 
     // Implementation methods
-    //-------------------------------------------------------------------------
+    // -------------------------------------------------------------------------
 
     protected ReadOnlyContext createContext(Hashtable environment, Map data) {
         return new ReadOnlyContext(environment, data);
     }
 
-    protected ActiveMQConnectionFactory createConnectionFactory(String name, Hashtable environment)   throws URISyntaxException {
+    protected ActiveMQConnectionFactory createConnectionFactory(String name, Hashtable environment)
+        throws URISyntaxException {
         Hashtable temp = new Hashtable(environment);
-        String prefix = connectionPrefix+name+".";
+        String prefix = connectionPrefix + name + ".";
         for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
-            Map.Entry entry = (Map.Entry) iter.next();
-            String key = (String) entry.getKey();
-            if( key.startsWith(prefix) ) {
+            Map.Entry entry = (Map.Entry)iter.next();
+            String key = (String)entry.getKey();
+            if (key.startsWith(prefix)) {
                 // Rename the key...
                 temp.remove(key);
                 key = key.substring(prefix.length());
@@ -150,10 +147,11 @@
     }
 
     protected String[] getConnectionFactoryNames(Map environment) {
-        String factoryNames = (String) environment.get("connectionFactoryNames");
+        String factoryNames = (String)environment.get("connectionFactoryNames");
         if (factoryNames != null) {
             List list = new ArrayList();
-            for (StringTokenizer enumeration = new StringTokenizer(factoryNames, ","); enumeration.hasMoreTokens();) {
+            for (StringTokenizer enumeration = new StringTokenizer(factoryNames, ","); enumeration
+                .hasMoreTokens();) {
                 list.add(enumeration.nextToken().trim());
             }
             int size = list.size();
@@ -168,7 +166,7 @@
 
     protected void createQueues(Map data, Hashtable environment) {
         for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
-            Map.Entry entry = (Map.Entry) iter.next();
+            Map.Entry entry = (Map.Entry)iter.next();
             String key = entry.getKey().toString();
             if (key.startsWith(queuePrefix)) {
                 String jndiName = key.substring(queuePrefix.length());
@@ -179,7 +177,7 @@
 
     protected void createTopics(Map data, Hashtable environment) {
         for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
-            Map.Entry entry = (Map.Entry) iter.next();
+            Map.Entry entry = (Map.Entry)iter.next();
             String key = entry.getKey().toString();
             if (key.startsWith(topicPrefix)) {
                 String jndiName = key.substring(topicPrefix.length());
@@ -201,11 +199,13 @@
     protected Topic createTopic(String name) {
         return new ActiveMQTopic(name);
     }
-	
+
     /**
-     * Factory method to create a new connection factory from the given environment
+     * Factory method to create a new connection factory from the given
+     * environment
      */
-    protected ActiveMQConnectionFactory createConnectionFactory(Hashtable environment) throws URISyntaxException {
+    protected ActiveMQConnectionFactory createConnectionFactory(Hashtable environment)
+        throws URISyntaxException {
         ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory();
         Properties properties = new Properties();
         properties.putAll(environment);
@@ -216,10 +216,9 @@
     public String getConnectionPrefix() {
         return connectionPrefix;
     }
-    
 
     public void setConnectionPrefix(String connectionPrefix) {
         this.connectionPrefix = connectionPrefix;
     }
-    
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java Wed Aug  8 11:56:59 2007
@@ -19,12 +19,13 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+
 /**
  * Implementation of a Marshaller for byte arrays
  * 
  * @version $Revision: 1.2 $
  */
-public class BytesMarshaller implements Marshaller{
+public class BytesMarshaller implements Marshaller {
     /**
      * Write the payload of this entry to the RawContainer
      * 
@@ -32,8 +33,8 @@
      * @param dataOut
      * @throws IOException
      */
-    public void writePayload(Object object,DataOutput dataOut) throws IOException{
-        byte[] data=(byte[]) object;
+    public void writePayload(Object object, DataOutput dataOut) throws IOException {
+        byte[] data = (byte[])object;
         dataOut.writeInt(data.length);
         dataOut.write(data);
     }
@@ -45,9 +46,9 @@
      * @return unmarshalled object
      * @throws IOException
      */
-    public Object readPayload(DataInput dataIn) throws IOException{
-        int size=dataIn.readInt();
-        byte[] data=new byte[size];
+    public Object readPayload(DataInput dataIn) throws IOException {
+        int size = dataIn.readInt();
+        byte[] data = new byte[size];
         dataIn.readFully(data);
         return data;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java Wed Aug  8 11:56:59 2007
@@ -20,77 +20,77 @@
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+
 /**
  * Used by RootContainers
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class ContainerId implements Externalizable{
-    private static final long serialVersionUID=-8883779541021821943L;
+public class ContainerId implements Externalizable {
+    private static final long serialVersionUID = -8883779541021821943L;
     private Object key;
     private String dataContainerName;
 
     public ContainerId() {
     }
-    
-    public ContainerId(Object key,String dataContainerName) {
-        this.key=key;
-        this.dataContainerName=dataContainerName;
+
+    public ContainerId(Object key, String dataContainerName) {
+        this.key = key;
+        this.dataContainerName = dataContainerName;
     }
-    
-    
+
     /**
      * @return Returns the dataContainerPrefix.
      */
-    public String getDataContainerName(){
+    public String getDataContainerName() {
         return dataContainerName;
     }
 
     /**
      * @param dataContainerName The dataContainerPrefix to set.
      */
-    public void setDataContainerName(String dataContainerName){
-        this.dataContainerName=dataContainerName;
+    public void setDataContainerName(String dataContainerName) {
+        this.dataContainerName = dataContainerName;
     }
 
     /**
      * @return Returns the key.
      */
-    public Object getKey(){
+    public Object getKey() {
         return key;
     }
 
     /**
      * @param key The key to set.
      */
-    public void setKey(Object key){
-        this.key=key;
+    public void setKey(Object key) {
+        this.key = key;
     }
-    
-    public int hashCode(){
+
+    public int hashCode() {
         return key.hashCode();
     }
-    
-    public boolean equals(Object obj){
+
+    public boolean equals(Object obj) {
         boolean result = false;
-        if (obj != null && obj instanceof ContainerId){
-            ContainerId other = (ContainerId) obj;
+        if (obj != null && obj instanceof ContainerId) {
+            ContainerId other = (ContainerId)obj;
             result = other.key.equals(this.key);
         }
         return result;
     }
 
-    public void writeExternal(ObjectOutput out) throws IOException{
+    public void writeExternal(ObjectOutput out) throws IOException {
         out.writeUTF(getDataContainerName());
         out.writeObject(key);
     }
 
-    public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
-        dataContainerName=in.readUTF();
-        key=in.readObject();
-    }
-    
-    public String toString(){
-        return "CID{"+dataContainerName + ":" + key + "}";
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        dataContainerName = in.readUTF();
+        key = in.readObject();
+    }
+
+    public String toString() {
+        return "CID{" + dataContainerName + ":" + key + "}";
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java Wed Aug  8 11:56:59 2007
@@ -18,16 +18,17 @@
 import java.util.NoSuchElementException;
 
 /**
- * Represents a container of persistent objects in the store Acts as a map, but values can be retrieved in insertion
- * order
+ * Represents a container of persistent objects in the store Acts as a map, but
+ * values can be retrieved in insertion order
  * 
  * @version $Revision: 1.2 $
  */
-public interface ListContainer<V> extends List<V>{
+public interface ListContainer<V> extends List<V> {
 
     /**
-     * The container is created or retrieved in an unloaded state. load populates the container will all the indexes
-     * used etc and should be called before any operations on the container
+     * The container is created or retrieved in an unloaded state. load
+     * populates the container will all the indexes used etc and should be
+     * called before any operations on the container
      */
     public void load();
 
@@ -43,7 +44,8 @@
     public boolean isLoaded();
 
     /**
-     * For homogenous containers can set a custom marshaller for loading values The default uses Object serialization
+     * For homogenous containers can set a custom marshaller for loading values
+     * The default uses Object serialization
      * 
      * @param marshaller
      */
@@ -67,8 +69,8 @@
     public void addFirst(V o);
 
     /**
-     * Appends the given element to the end of this list. (Identical in function to the <tt>add</tt> method; included
-     * only for consistency.)
+     * Appends the given element to the end of this list. (Identical in function
+     * to the <tt>add</tt> method; included only for consistency.)
      * 
      * @param o the element to be inserted at the end of this list.
      */
@@ -91,7 +93,8 @@
     public V removeLast();
 
     /**
-     * remove an objecr from the list without retrieving the old value from the store
+     * remove an objecr from the list without retrieving the old value from the
+     * store
      * 
      * @param position
      * @return true if successful
@@ -107,7 +110,8 @@
     public StoreEntry placeLast(V object);
 
     /**
-     * insert an Object in first position int the list but get a StoreEntry of its position
+     * insert an Object in first position int the list but get a StoreEntry of
+     * its position
      * 
      * @param object
      * @return the location in the Store
@@ -115,12 +119,13 @@
     public StoreEntry placeFirst(V object);
 
     /**
-     * Advanced feature = must ensure the object written doesn't overwrite other objects in the container
+     * Advanced feature = must ensure the object written doesn't overwrite other
+     * objects in the container
      * 
      * @param entry
      * @param object
      */
-    public void update(StoreEntry entry,V object);
+    public void update(StoreEntry entry, V object);
 
     /**
      * Retrieve an Object from the Store by its location
@@ -167,10 +172,11 @@
      * @return true if successful
      */
     public boolean remove(StoreEntry entry);
-    
+
     /**
-     * It's possible that a StoreEntry could be come stale
-     * this will return an upto date entry for the StoreEntry position
+     * It's possible that a StoreEntry could be come stale this will return an
+     * upto date entry for the StoreEntry position
+     * 
      * @param entry old entry
      * @return a refreshed StoreEntry
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java Wed Aug  8 11:56:59 2007
@@ -21,47 +21,48 @@
 import java.util.Set;
 
 /**
- *Represents a container of persistent objects in the store
- *Acts as a map, but values can be retrieved in insertion order
+ * Represents a container of persistent objects in the store Acts as a map, but
+ * values can be retrieved in insertion order
  * 
  * @version $Revision: 1.2 $
  */
-public interface MapContainer<K, V> extends Map<K, V>{
-    
-    
-    /**
-     * The container is created or retrieved in 
-     * an unloaded state.
-     * load populates the container will all the indexes used etc
-     * and should be called before any operations on the container
+public interface MapContainer<K, V> extends Map<K, V> {
+
+    /**
+     * The container is created or retrieved in an unloaded state. load
+     * populates the container will all the indexes used etc and should be
+     * called before any operations on the container
      */
     public void load();
-    
+
     /**
      * unload indexes from the container
-     *
+     * 
      */
     public void unload();
-    
+
     /**
      * @return true if the indexes are loaded
      */
     public boolean isLoaded();
-    
+
     /**
      * For homogenous containers can set a custom marshaller for loading keys
      * The default uses Object serialization
+     * 
      * @param keyMarshaller
      */
     public void setKeyMarshaller(Marshaller<K> keyMarshaller);
-    
+
     /**
      * For homogenous containers can set a custom marshaller for loading values
      * The default uses Object serialization
-     * @param valueMarshaller 
-   
+     * 
+     * @param valueMarshaller
+     * 
      */
     public void setValueMarshaller(Marshaller<V> valueMarshaller);
+
     /**
      * @return the id the MapContainer was create with
      */
@@ -78,30 +79,31 @@
     public boolean isEmpty();
 
     /**
-     * @param key 
+     * @param key
      * @return true if the container contains the key
      */
     public boolean containsKey(K key);
 
     /**
      * Get the value associated with the key
-     * @param key 
+     * 
+     * @param key
      * @return the value associated with the key from the store
      */
     public V get(K key);
 
-    
     /**
-     * @param o 
+     * @param o
      * @return true if the MapContainer contains the value o
      */
     public boolean containsValue(K o);
 
     /**
      * Add add entries in the supplied Map
+     * 
      * @param map
      */
-    public void putAll(Map<K,V> map);
+    public void putAll(Map<K, V> map);
 
     /**
      * @return a Set of all the keys
@@ -109,30 +111,30 @@
     public Set<K> keySet();
 
     /**
-     * @return a collection of all the values - the values will be lazily pulled out of the
-     * store if iterated etc.
+     * @return a collection of all the values - the values will be lazily pulled
+     *         out of the store if iterated etc.
      */
     public Collection<V> values();
 
     /**
-     * @return a Set of all the Map.Entry instances - the values will be lazily pulled out of the
-     * store if iterated etc.
+     * @return a Set of all the Map.Entry instances - the values will be lazily
+     *         pulled out of the store if iterated etc.
      */
-    public Set<Map.Entry<K,V>> entrySet();
+    public Set<Map.Entry<K, V>> entrySet();
 
-   
     /**
      * Add an entry
+     * 
      * @param key
      * @param value
      * @return the old value for the key
      */
-    public V put(K key,V value);
-
+    public V put(K key, V value);
 
     /**
      * remove an entry associated with the key
-     * @param key 
+     * 
+     * @param key
      * @return the old value assocaited with the key or null
      */
     public V remove(K key);
@@ -141,77 +143,83 @@
      * empty the container
      */
     public void clear();
-    
+
     /**
      * Add an entry to the Store Map
+     * 
      * @param key
      * @param Value
      * @return the StoreEntry associated with the entry
      */
     public StoreEntry place(K key, V Value);
-    
+
     /**
      * Remove an Entry from ther Map
+     * 
      * @param entry
      */
     public void remove(StoreEntry entry);
-    
+
     /**
      * Get the Key object from it's location
+     * 
      * @param keyLocation
      * @return the key for the entry
      */
     public K getKey(StoreEntry keyLocation);
-    
+
     /**
      * Get the value from it's location
+     * 
      * @param Valuelocation
      * @return the Object
      */
     public V getValue(StoreEntry Valuelocation);
-    
-    /** Get the StoreEntry for the first value in the Map
-    * 
-    * @return the first StoreEntry or null if the map is empty
-    */
-   public StoreEntry getFirst();
-
-   /**
-    * Get the StoreEntry for the last value item of the Map
-    * 
-    * @return the last StoreEntry or null if the list is empty
-    */
-   public StoreEntry getLast();
-
-   /**
-    * Get the next StoreEntry value from the map
-    * 
-    * @param entry
-    * @return the next StoreEntry or null
-    */
-   public StoreEntry getNext(StoreEntry entry);
-
-   /**
-    * Get the previous StoreEntry from the map
-    * 
-    * @param entry
-    * @return the previous store entry or null
-    */
-   public StoreEntry getPrevious(StoreEntry entry);
-
-   
-   /**
-    * It's possible that a StoreEntry could be come stale
-    * this will return an upto date entry for the StoreEntry position
-    * @param entry old entry
-    * @return a refreshed StoreEntry
-    */
-   public StoreEntry refresh(StoreEntry entry);
-   
-   /**
-    * Get the StoreEntry associated with the key
-    * @param key
-    * @return the StoreEntry
-    */
-   public StoreEntry getEntry(K key);
+
+    /**
+     * Get the StoreEntry for the first value in the Map
+     * 
+     * @return the first StoreEntry or null if the map is empty
+     */
+    public StoreEntry getFirst();
+
+    /**
+     * Get the StoreEntry for the last value item of the Map
+     * 
+     * @return the last StoreEntry or null if the list is empty
+     */
+    public StoreEntry getLast();
+
+    /**
+     * Get the next StoreEntry value from the map
+     * 
+     * @param entry
+     * @return the next StoreEntry or null
+     */
+    public StoreEntry getNext(StoreEntry entry);
+
+    /**
+     * Get the previous StoreEntry from the map
+     * 
+     * @param entry
+     * @return the previous store entry or null
+     */
+    public StoreEntry getPrevious(StoreEntry entry);
+
+    /**
+     * It's possible that a StoreEntry could be come stale this will return an
+     * upto date entry for the StoreEntry position
+     * 
+     * @param entry old entry
+     * @return a refreshed StoreEntry
+     */
+    public StoreEntry refresh(StoreEntry entry);
+
+    /**
+     * Get the StoreEntry associated with the key
+     * 
+     * @param key
+     * @return the StoreEntry
+     */
+    public StoreEntry getEntry(K key);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageIdMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageIdMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageIdMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageIdMarshaller.java Wed Aug  8 11:56:59 2007
@@ -20,6 +20,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import org.apache.activemq.command.MessageId;
+
 /**
  * Implementation of a Marshaller for MessageIds
  * 
@@ -33,7 +34,7 @@
      * @param dataOut
      * @throws IOException
      */
-    public void writePayload(MessageId object,DataOutput dataOut) throws IOException{
+    public void writePayload(MessageId object, DataOutput dataOut) throws IOException {
         dataOut.writeUTF(object.toString());
     }
 
@@ -44,7 +45,7 @@
      * @return unmarshalled object
      * @throws IOException
      */
-    public MessageId readPayload(DataInput dataIn) throws IOException{
+    public MessageId readPayload(DataInput dataIn) throws IOException {
         return new MessageId(dataIn.readUTF());
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java Wed Aug  8 11:56:59 2007
@@ -27,7 +27,7 @@
  * 
  * @version $Revision: 1.2 $
  */
-public class ObjectMarshaller implements Marshaller{
+public class ObjectMarshaller implements Marshaller {
 
     /**
      * Write the payload of this entry to the RawContainer
@@ -36,12 +36,12 @@
      * @param dataOut
      * @throws IOException
      */
-    public void writePayload(Object object,DataOutput dataOut) throws IOException{
-        ByteArrayOutputStream bytesOut=new ByteArrayOutputStream();
-        ObjectOutputStream objectOut=new ObjectOutputStream(bytesOut);
+    public void writePayload(Object object, DataOutput dataOut) throws IOException {
+        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        ObjectOutputStream objectOut = new ObjectOutputStream(bytesOut);
         objectOut.writeObject(object);
         objectOut.close();
-        byte[] data=bytesOut.toByteArray();
+        byte[] data = bytesOut.toByteArray();
         dataOut.writeInt(data.length);
         dataOut.write(data);
     }
@@ -53,15 +53,15 @@
      * @return unmarshalled object
      * @throws IOException
      */
-    public Object readPayload(DataInput dataIn) throws IOException{
-        int size=dataIn.readInt();
-        byte[] data=new byte[size];
+    public Object readPayload(DataInput dataIn) throws IOException {
+        int size = dataIn.readInt();
+        byte[] data = new byte[size];
         dataIn.readFully(data);
-        ByteArrayInputStream bytesIn=new ByteArrayInputStream(data);
-        ObjectInputStream objectIn=new ObjectInputStream(bytesIn);
-        try{
+        ByteArrayInputStream bytesIn = new ByteArrayInputStream(data);
+        ObjectInputStream objectIn = new ObjectInputStream(bytesIn);
+        try {
             return objectIn.readObject();
-        }catch(ClassNotFoundException e){
+        } catch (ClassNotFoundException e) {
             throw new IOException(e.getMessage());
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/RuntimeStoreException.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/RuntimeStoreException.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/RuntimeStoreException.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/RuntimeStoreException.java Wed Aug  8 11:56:59 2007
@@ -16,47 +16,48 @@
  */
 package org.apache.activemq.kaha;
 
-
 /**
-*Runtime exception for the Store
-* 
-* @version $Revision: 1.2 $
-*/
-
-public class RuntimeStoreException extends RuntimeException{
-    
-   
-    private static final long serialVersionUID=8807084681372365173L;
+ * Runtime exception for the Store
+ * 
+ * @version $Revision: 1.2 $
+ */
+
+public class RuntimeStoreException extends RuntimeException {
+
+    private static final long serialVersionUID = 8807084681372365173L;
 
     /**
      * Constructor
      */
-    public RuntimeStoreException(){
+    public RuntimeStoreException() {
         super();
     }
 
     /**
      * Constructor
+     * 
      * @param message
      */
-    public RuntimeStoreException(String message){
+    public RuntimeStoreException(String message) {
         super(message);
     }
 
     /**
      * Constructor
+     * 
      * @param message
      * @param cause
      */
-    public RuntimeStoreException(String message,Throwable cause){
-        super(message,cause);
+    public RuntimeStoreException(String message, Throwable cause) {
+        super(message, cause);
     }
 
     /**
      * Constructor
+     * 
      * @param cause
      */
-    public RuntimeStoreException(Throwable cause){
+    public RuntimeStoreException(Throwable cause) {
         super(cause);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Wed Aug  8 11:56:59 2007
@@ -18,37 +18,38 @@
 
 import java.io.IOException;
 import java.util.Set;
+
 /**
  * A Store is holds persistent containers
  * 
  * @version $Revision: 1.2 $
  */
-public interface Store{
+public interface Store {
     /**
      * Defauly container name
      */
-    public static final String DEFAULT_CONTAINER_NAME="kaha";
+    public static final String DEFAULT_CONTAINER_NAME = "kaha";
 
     /**
      * Byte Marshaller
      */
     public final static Marshaller BytesMarshaller = new BytesMarshaller();
-    
+
     /**
      * Object Marshaller
      */
     public final static Marshaller ObjectMarshaller = new ObjectMarshaller();
-    
+
     /**
      * String Marshaller
      */
     public final static Marshaller StringMarshaller = new StringMarshaller();
-    
-    
+
     /**
      * Command Marshaller
      */
     public final static Marshaller CommandMarshaller = new CommandMarshaller();
+
     /**
      * close the store
      * 
@@ -86,46 +87,50 @@
      * @throws IOException
      */
     public boolean doesMapContainerExist(Object id) throws IOException;
-    
+
     /**
      * Checks if a MapContainer exists in the named container
      * 
      * @param id
-     * @param containerName 
+     * @param containerName
      * @return new MapContainer
      * @throws IOException
      */
-    public boolean doesMapContainerExist(Object id,String containerName) throws IOException;
+    public boolean doesMapContainerExist(Object id, String containerName) throws IOException;
 
     /**
-     * Get a MapContainer with the given id - the MapContainer is created if needed
+     * Get a MapContainer with the given id - the MapContainer is created if
+     * needed
      * 
      * @param id
      * @return container for the associated id or null if it doesn't exist
      * @throws IOException
      */
-     public MapContainer getMapContainer(Object id) throws IOException;
+    public MapContainer getMapContainer(Object id) throws IOException;
 
     /**
-     * Get a MapContainer with the given id - the MapContainer is created if needed
+     * Get a MapContainer with the given id - the MapContainer is created if
+     * needed
      * 
      * @param id
      * @param containerName
      * @return container for the associated id or null if it doesn't exist
      * @throws IOException
      */
-    public MapContainer getMapContainer(Object id,String containerName) throws IOException;
-    
+    public MapContainer getMapContainer(Object id, String containerName) throws IOException;
+
     /**
-     * Get a MapContainer with the given id - the MapContainer is created if needed
+     * Get a MapContainer with the given id - the MapContainer is created if
+     * needed
      * 
      * @param id
      * @param containerName
-     * @param persistentIndex 
+     * @param persistentIndex
      * @return container for the associated id or null if it doesn't exist
      * @throws IOException
      */
-    public MapContainer getMapContainer(Object id,String containerName,boolean persistentIndex) throws IOException;
+    public MapContainer getMapContainer(Object id, String containerName, boolean persistentIndex)
+        throws IOException;
 
     /**
      * delete a container from the default container
@@ -134,18 +139,19 @@
      * @throws IOException
      */
     public void deleteMapContainer(Object id) throws IOException;
-    
+
     /**
      * delete a MapContainer from the name container
      * 
      * @param id
-     * @param containerName 
+     * @param containerName
      * @throws IOException
      */
-    public void deleteMapContainer(Object id,String containerName) throws IOException;
-    
+    public void deleteMapContainer(Object id, String containerName) throws IOException;
+
     /**
      * Delete Map container
+     * 
      * @param id
      * @throws IOException
      */
@@ -167,16 +173,16 @@
      * @throws IOException
      */
     public boolean doesListContainerExist(Object id) throws IOException;
-    
+
     /**
      * Checks if a ListContainer exists in the named container
      * 
      * @param id
-     * @param containerName 
+     * @param containerName
      * @return new MapContainer
      * @throws IOException
      */
-    public boolean doesListContainerExist(Object id,String containerName) throws IOException;
+    public boolean doesListContainerExist(Object id, String containerName) throws IOException;
 
     /**
      * Get a ListContainer with the given id and creates it if it doesn't exist
@@ -185,7 +191,7 @@
      * @return container for the associated id or null if it doesn't exist
      * @throws IOException
      */
-     public ListContainer getListContainer(Object id) throws IOException;
+    public ListContainer getListContainer(Object id) throws IOException;
 
     /**
      * Get a ListContainer with the given id and creates it if it doesn't exist
@@ -195,18 +201,19 @@
      * @return container for the associated id or null if it doesn't exist
      * @throws IOException
      */
-    public ListContainer getListContainer(Object id,String containerName) throws IOException;
-    
+    public ListContainer getListContainer(Object id, String containerName) throws IOException;
+
     /**
      * Get a ListContainer with the given id and creates it if it doesn't exist
      * 
      * @param id
      * @param containerName
-     * @param persistentIndex 
+     * @param persistentIndex
      * @return container for the associated id or null if it doesn't exist
      * @throws IOException
      */
-    public ListContainer getListContainer(Object id,String containerName,boolean persistentIndex) throws IOException;
+    public ListContainer getListContainer(Object id, String containerName, boolean persistentIndex)
+        throws IOException;
 
     /**
      * delete a ListContainer from the default container
@@ -215,18 +222,19 @@
      * @throws IOException
      */
     public void deleteListContainer(Object id) throws IOException;
-    
+
     /**
      * delete a ListContainer from the named container
      * 
      * @param id
-     * @param containerName 
+     * @param containerName
      * @throws IOException
      */
-    public void deleteListContainer(Object id,String containerName) throws IOException;
+    public void deleteListContainer(Object id, String containerName) throws IOException;
 
     /**
      * delete a list container
+     * 
      * @param id
      * @throws IOException
      */
@@ -239,7 +247,7 @@
      * @throws IOException
      */
     public Set<ContainerId> getListContainerIds() throws IOException;
-    
+
     /**
      * @return the maxDataFileLength
      */
@@ -249,20 +257,21 @@
      * @param maxDataFileLength the maxDataFileLength to set
      */
     public void setMaxDataFileLength(long maxDataFileLength);
-    
+
     /**
      * @see org.apache.activemq.kaha.IndexTypes
      * @return the default index type
      */
     public String getIndexTypeAsString();
-    
+
     /**
      * Set the default index type
+     * 
      * @param type
      * @see org.apache.activemq.kaha.IndexTypes
      */
     public void setIndexTypeAsString(String type);
-    
+
     /**
      * @return true if the store has been initialized
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java Wed Aug  8 11:56:59 2007
@@ -25,28 +25,29 @@
  * 
  * @version $Revision: 1.2 $
  */
-public class StoreFactory{
-   
+public class StoreFactory {
 
     /**
      * open or create a Store
+     * 
      * @param name
      * @param mode
      * @return the opened/created store
      * @throws IOException
      */
-    public static Store open(String name,String mode) throws IOException{
-        return new KahaStore(name,mode);
+    public static Store open(String name, String mode) throws IOException {
+        return new KahaStore(name, mode);
     }
-    
+
     /**
      * Delete a database
+     * 
      * @param name of the database
      * @return true if successful
-     * @throws IOException 
+     * @throws IOException
      */
-    public static boolean delete(String name) throws IOException{
-        KahaStore store = new KahaStore(name,"rw");
+    public static boolean delete(String name) throws IOException {
+        KahaStore store = new KahaStore(name, "rw");
         return store.delete();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java Wed Aug  8 11:56:59 2007
@@ -19,6 +19,7 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+
 /**
  * Implementation of a Marshaller for Strings
  * 
@@ -32,7 +33,7 @@
      * @param dataOut
      * @throws IOException
      */
-    public void writePayload(String object,DataOutput dataOut) throws IOException{
+    public void writePayload(String object, DataOutput dataOut) throws IOException {
         dataOut.writeUTF(object);
     }
 
@@ -43,7 +44,7 @@
      * @return unmarshalled object
      * @throws IOException
      */
-    public String readPayload(DataInput dataIn) throws IOException{
+    public String readPayload(DataInput dataIn) throws IOException {
         return dataIn.readUTF();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java Wed Aug  8 11:56:59 2007
@@ -8,35 +8,32 @@
 
 public interface DataManager {
 
-	String getName();
+    String getName();
 
-	Object readItem(Marshaller marshaller, StoreLocation item)
-			throws IOException;
+    Object readItem(Marshaller marshaller, StoreLocation item) throws IOException;
 
-	StoreLocation storeDataItem(Marshaller marshaller, Object payload)
-			throws IOException;
+    StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException;
 
-	StoreLocation storeRedoItem(Object payload) throws IOException;
+    StoreLocation storeRedoItem(Object payload) throws IOException;
 
-	void updateItem(StoreLocation location, Marshaller marshaller,
-			Object payload) throws IOException;
+    void updateItem(StoreLocation location, Marshaller marshaller, Object payload) throws IOException;
 
-	void recoverRedoItems(RedoListener listener) throws IOException;
+    void recoverRedoItems(RedoListener listener) throws IOException;
 
-	void close() throws IOException;
+    void close() throws IOException;
 
-	void force() throws IOException;
+    void force() throws IOException;
 
-	boolean delete() throws IOException;
+    boolean delete() throws IOException;
 
-	void addInterestInFile(int file) throws IOException;
+    void addInterestInFile(int file) throws IOException;
 
-	void removeInterestInFile(int file) throws IOException;
+    void removeInterestInFile(int file) throws IOException;
 
-	void consolidateDataFiles() throws IOException;
+    void consolidateDataFiles() throws IOException;
 
-	Marshaller getRedoMarshaller();
+    Marshaller getRedoMarshaller();
 
-	void setRedoMarshaller(Marshaller redoMarshaller);
+    void setRedoMarshaller(Marshaller redoMarshaller);
 
-}
\ No newline at end of file
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java Wed Aug  8 11:56:59 2007
@@ -35,58 +35,55 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
-* A container of roots for other Containers
-* 
-* @version $Revision: 1.2 $
-*/
+ * A container of roots for other Containers
+ * 
+ * @version $Revision: 1.2 $
+ */
 
 class IndexRootContainer {
-    private static final Log log=LogFactory.getLog(IndexRootContainer.class);
+    private static final Log log = LogFactory.getLog(IndexRootContainer.class);
     protected static final Marshaller rootMarshaller = Store.ObjectMarshaller;
     protected IndexItem root;
     protected IndexManager indexManager;
     protected DataManager dataManager;
     protected Map map = new ConcurrentHashMap();
     protected LinkedList list = new LinkedList();
-    
-    
-    IndexRootContainer(IndexItem root,IndexManager im,DataManager dfm) throws IOException{
-        this.root=root;
-        this.indexManager=im;
-        this.dataManager=dfm;
-        long nextItem=root.getNextItem();
-        while(nextItem!=Item.POSITION_NOT_SET){
-            StoreEntry item=indexManager.getIndex(nextItem);
-            StoreLocation data=item.getKeyDataItem();
-            Object key = dataManager.readItem(rootMarshaller,data);
-            map.put(key,item);
+
+    IndexRootContainer(IndexItem root, IndexManager im, DataManager dfm) throws IOException {
+        this.root = root;
+        this.indexManager = im;
+        this.dataManager = dfm;
+        long nextItem = root.getNextItem();
+        while (nextItem != Item.POSITION_NOT_SET) {
+            StoreEntry item = indexManager.getIndex(nextItem);
+            StoreLocation data = item.getKeyDataItem();
+            Object key = dataManager.readItem(rootMarshaller, data);
+            map.put(key, item);
             list.add(item);
-            nextItem=item.getNextItem();
+            nextItem = item.getNextItem();
             dataManager.addInterestInFile(item.getKeyFile());
         }
     }
-    
-    Set getKeys(){
+
+    Set getKeys() {
         return map.keySet();
     }
-    
-    
-    
-    IndexItem addRoot(IndexManager containerIndexManager,ContainerId key) throws IOException{
-        if (map.containsKey(key)){
-            removeRoot(containerIndexManager,key);
+
+    IndexItem addRoot(IndexManager containerIndexManager, ContainerId key) throws IOException {
+        if (map.containsKey(key)) {
+            removeRoot(containerIndexManager, key);
         }
-        
+
         StoreLocation data = dataManager.storeDataItem(rootMarshaller, key);
         IndexItem newRoot = indexManager.createNewIndex();
         newRoot.setKeyData(data);
         IndexItem containerRoot = containerIndexManager.createNewIndex();
         containerIndexManager.storeIndex(containerRoot);
         newRoot.setValueOffset(containerRoot.getOffset());
-       
-        IndexItem last=list.isEmpty()?null:(IndexItem) list.getLast();
-        last=last==null?root:last;
-        long prev=last.getOffset();
+
+        IndexItem last = list.isEmpty() ? null : (IndexItem)list.getLast();
+        last = last == null ? root : last;
+        long prev = last.getOffset();
         newRoot.setPreviousItem(prev);
         indexManager.storeIndex(newRoot);
         last.setNextItem(newRoot.getOffset());
@@ -95,25 +92,25 @@
         list.add(newRoot);
         return containerRoot;
     }
-    
-    void removeRoot(IndexManager containerIndexManager,ContainerId key) throws IOException{
-        StoreEntry oldRoot=(StoreEntry)map.remove(key);
-        if(oldRoot!=null){
+
+    void removeRoot(IndexManager containerIndexManager, ContainerId key) throws IOException {
+        StoreEntry oldRoot = (StoreEntry)map.remove(key);
+        if (oldRoot != null) {
             dataManager.removeInterestInFile(oldRoot.getKeyFile());
             // get the container root
-            IndexItem containerRoot=containerIndexManager.getIndex(oldRoot.getValueOffset());
-            if(containerRoot!=null){
+            IndexItem containerRoot = containerIndexManager.getIndex(oldRoot.getValueOffset());
+            if (containerRoot != null) {
                 containerIndexManager.freeIndex(containerRoot);
             }
-            int index=list.indexOf(oldRoot);
-            IndexItem prev=index>0?(IndexItem)list.get(index-1):root;
-            prev=prev==null?root:prev;
-            IndexItem next=index<(list.size()-1)?(IndexItem)list.get(index+1):null;
-            if(next!=null){
+            int index = list.indexOf(oldRoot);
+            IndexItem prev = index > 0 ? (IndexItem)list.get(index - 1) : root;
+            prev = prev == null ? root : prev;
+            IndexItem next = index < (list.size() - 1) ? (IndexItem)list.get(index + 1) : null;
+            if (next != null) {
                 prev.setNextItem(next.getOffset());
                 next.setPreviousItem(prev.getOffset());
                 indexManager.updateIndexes(next);
-            }else{
+            } else {
                 prev.setNextItem(Item.POSITION_NOT_SET);
             }
             indexManager.updateIndexes(prev);
@@ -121,19 +118,17 @@
             indexManager.freeIndex((IndexItem)oldRoot);
         }
     }
-    
-    IndexItem getRoot(IndexManager containerIndexManager,ContainerId key) throws IOException{
-        StoreEntry index =  (StoreEntry) map.get(key);
-        if (index != null){
+
+    IndexItem getRoot(IndexManager containerIndexManager, ContainerId key) throws IOException {
+        StoreEntry index = (StoreEntry)map.get(key);
+        if (index != null) {
             return containerIndexManager.getIndex(index.getValueOffset());
         }
         return null;
     }
-    
-    boolean doesRootExist(Object key){
+
+    boolean doesRootExist(Object key) {
         return map.containsKey(key);
     }
-
-    
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreLockedExcpetion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreLockedExcpetion.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreLockedExcpetion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreLockedExcpetion.java Wed Aug  8 11:56:59 2007
@@ -18,27 +18,25 @@
 
 import java.io.IOException;
 
-
-
 /**
-* Exception thrown if the store is in use by another application
-* 
-* @version $Revision: 1.1.1.1 $
-*/
-public class StoreLockedExcpetion extends IOException{
+ * Exception thrown if the store is in use by another application
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class StoreLockedExcpetion extends IOException {
 
-    private static final long serialVersionUID=3857646689671366926L;
+    private static final long serialVersionUID = 3857646689671366926L;
 
     /**
      * Default Constructor
      */
-    public StoreLockedExcpetion(){
+    public StoreLockedExcpetion() {
     }
 
     /**
      * @param s
      */
-    public StoreLockedExcpetion(String s){
+    public StoreLockedExcpetion(String s) {
         super(s);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java Wed Aug  8 11:56:59 2007
@@ -23,129 +23,130 @@
 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
 import org.apache.activemq.util.ByteSequence;
+
 /**
- * Optimized Store reader and updater.  Single threaded and synchronous.  Use in conjunction 
- * with the DataFileAccessorPool of concurrent use.
+ * Optimized Store reader and updater. Single threaded and synchronous. Use in
+ * conjunction with the DataFileAccessorPool of concurrent use.
  * 
  * @version $Revision: 1.1.1.1 $
  */
 final class DataFileAccessor {
-    
-	private final DataFile dataFile;
-	private final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites;
-	private final RandomAccessFile file;
-	private boolean disposed;
-    
+
+    private final DataFile dataFile;
+    private final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites;
+    private final RandomAccessFile file;
+    private boolean disposed;
+
     /**
      * Construct a Store reader
      * 
      * @param fileId
-     * @throws IOException 
+     * @throws IOException
      */
-    public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException{
-		this.dataFile = dataFile;
-		this.inflightWrites = dataManager.getInflightWrites();
-		this.file = dataFile.openRandomAccessFile(false);
+    public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException {
+        this.dataFile = dataFile;
+        this.inflightWrites = dataManager.getInflightWrites();
+        this.file = dataFile.openRandomAccessFile(false);
+    }
+
+    public DataFile getDataFile() {
+        return dataFile;
     }
 
-	public DataFile getDataFile() {
-		return dataFile;
-	}
-
-	public void dispose() {
-    	if( disposed )
-    		return;
-    	disposed=true;
+    public void dispose() {
+        if (disposed)
+            return;
+        disposed = true;
         try {
-        	dataFile.closeRandomAccessFile(file);
-		} catch (IOException e) {
-			e.printStackTrace();
-		}
+            dataFile.closeRandomAccessFile(file);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
     }
-    
+
     public ByteSequence readRecord(Location location) throws IOException {
-    	
-    	if( !location.isValid() )
-    		throw new IOException("Invalid location: "+location);
-    	    	
-    	WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
-    	if( asyncWrite!= null ) {
-    		return asyncWrite.data;
-    	}
-
-		try {
-
-			if( location.getSize()==Location.NOT_SET ) {
-		        file.seek(location.getOffset());
-		        location.setSize(file.readInt());
-				file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE);
-	    	} else {
-				file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE);
-	    	}
-			
-			byte[] data=new byte[location.getSize()-AsyncDataManager.ITEM_HEAD_FOOT_SPACE];
-			file.readFully(data);
-			return new ByteSequence(data, 0, data.length);
-
-		} catch (RuntimeException e) {
-			throw new IOException("Invalid location: "+location+", : "+e);
-		}
+
+        if (!location.isValid())
+            throw new IOException("Invalid location: " + location);
+
+        WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+        if (asyncWrite != null) {
+            return asyncWrite.data;
+        }
+
+        try {
+
+            if (location.getSize() == Location.NOT_SET) {
+                file.seek(location.getOffset());
+                location.setSize(file.readInt());
+                file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
+            } else {
+                file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
+            }
+
+            byte[] data = new byte[location.getSize() - AsyncDataManager.ITEM_HEAD_FOOT_SPACE];
+            file.readFully(data);
+            return new ByteSequence(data, 0, data.length);
+
+        } catch (RuntimeException e) {
+            throw new IOException("Invalid location: " + location + ", : " + e);
+        }
     }
-    
+
     public void readLocationDetails(Location location) throws IOException {
-    	WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
-    	if( asyncWrite!= null ) {
-    		location.setSize(asyncWrite.location.getSize());
-    		location.setType(asyncWrite.location.getType());
-    	} else {
-	        file.seek(location.getOffset());
-	        location.setSize(file.readInt());
-	        location.setType(file.readByte());
-    	}
-    }
-
-	public boolean readLocationDetailsAndValidate(Location location) {
-    	try {
-        	WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
-        	if( asyncWrite!= null ) {
-        		location.setSize(asyncWrite.location.getSize());
-        		location.setType(asyncWrite.location.getType());
-        	} else {
-		        file.seek(location.getOffset());
-		        location.setSize(file.readInt());
-		        location.setType(file.readByte());
-				
-				byte data[] = new byte[3];
-				file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR);
-				file.readFully(data);
-				if( data[0] != AsyncDataManager.ITEM_HEAD_SOR[0] ||
-					data[1] != AsyncDataManager.ITEM_HEAD_SOR[1] ||
-					data[2] != AsyncDataManager.ITEM_HEAD_SOR[2] ) {
-					return false;
-				}
-				file.seek(location.getOffset()+location.getSize()-AsyncDataManager.ITEM_FOOT_SPACE);
-				file.readFully(data);
-				if( data[0] != AsyncDataManager.ITEM_HEAD_EOR[0] ||
-					data[1] != AsyncDataManager.ITEM_HEAD_EOR[1] ||
-					data[2] != AsyncDataManager.ITEM_HEAD_EOR[2] ) {
-					return false;
-				}
-			}
-		} catch (IOException e) {
-			return false;
-		}
-		return true;
-	}
-
-	public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
-		
-		file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE);
-		int size = Math.min(data.getLength(), location.getSize());
-		file.write(data.getData(), data.getOffset(), size);
-		if( sync ) {
-			file.getFD().sync();
-		}
-        		
-	}
+        WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+        if (asyncWrite != null) {
+            location.setSize(asyncWrite.location.getSize());
+            location.setType(asyncWrite.location.getType());
+        } else {
+            file.seek(location.getOffset());
+            location.setSize(file.readInt());
+            location.setType(file.readByte());
+        }
+    }
+
+    public boolean readLocationDetailsAndValidate(Location location) {
+        try {
+            WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+            if (asyncWrite != null) {
+                location.setSize(asyncWrite.location.getSize());
+                location.setType(asyncWrite.location.getType());
+            } else {
+                file.seek(location.getOffset());
+                location.setSize(file.readInt());
+                location.setType(file.readByte());
+
+                byte data[] = new byte[3];
+                file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR);
+                file.readFully(data);
+                if (data[0] != AsyncDataManager.ITEM_HEAD_SOR[0]
+                    || data[1] != AsyncDataManager.ITEM_HEAD_SOR[1]
+                    || data[2] != AsyncDataManager.ITEM_HEAD_SOR[2]) {
+                    return false;
+                }
+                file.seek(location.getOffset() + location.getSize() - AsyncDataManager.ITEM_FOOT_SPACE);
+                file.readFully(data);
+                if (data[0] != AsyncDataManager.ITEM_HEAD_EOR[0]
+                    || data[1] != AsyncDataManager.ITEM_HEAD_EOR[1]
+                    || data[2] != AsyncDataManager.ITEM_HEAD_EOR[2]) {
+                    return false;
+                }
+            }
+        } catch (IOException e) {
+            return false;
+        }
+        return true;
+    }
+
+    public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
+
+        file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
+        int size = Math.min(data.getLength(), location.getSize());
+        file.write(data.getData(), data.getOffset(), size);
+        if (sync) {
+            file.getFD().sync();
+        }
+
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java Wed Aug  8 11:56:59 2007
@@ -27,348 +27,351 @@
 import org.apache.activemq.util.LinkedNode;
 
 /**
- * An optimized writer to do batch appends to a data file.  This object is thread safe 
- * and gains throughput as you increase the number of concurrent writes it does.
+ * An optimized writer to do batch appends to a data file. This object is thread
+ * safe and gains throughput as you increase the number of concurrent writes it
+ * does.
  * 
  * @version $Revision: 1.1.1.1 $
  */
 class DataFileAppender {
-    
-	protected static final byte []RESERVED_SPACE= new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE];
-	protected static final String SHUTDOWN_COMMAND = "SHUTDOWN";
-	int MAX_WRITE_BATCH_SIZE = 1024*1024*4;
-	
-	static public class WriteKey {
-	    private final int file;
-	    private final long offset;
-	    private final int hash;
-
-		public WriteKey(Location item){
-	    	file = item.getDataFileId();
-	    	offset = item.getOffset();
-	    	// TODO: see if we can build a better hash  
-	    	hash = (int) (file  ^ offset);
-	    }
-	 
-	    public int hashCode() {
-	    	return hash;  
-	    }
-	    
-	    public boolean equals(Object obj){
-            if(obj instanceof WriteKey){
-                WriteKey di=(WriteKey)obj;
-                return di.file==file&&di.offset==offset;
+
+    protected static final byte[] RESERVED_SPACE = new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE];
+    protected static final String SHUTDOWN_COMMAND = "SHUTDOWN";
+    int MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
+
+    static public class WriteKey {
+        private final int file;
+        private final long offset;
+        private final int hash;
+
+        public WriteKey(Location item) {
+            file = item.getDataFileId();
+            offset = item.getOffset();
+            // TODO: see if we can build a better hash
+            hash = (int)(file ^ offset);
+        }
+
+        public int hashCode() {
+            return hash;
+        }
+
+        public boolean equals(Object obj) {
+            if (obj instanceof WriteKey) {
+                WriteKey di = (WriteKey)obj;
+                return di.file == file && di.offset == offset;
             }
             return false;
         }
-	}
-	
-	public class WriteBatch {
-		
-		public final DataFile dataFile;
-		public final WriteCommand first;
-		public final CountDownLatch latch = new CountDownLatch(1);
-		public int size;
-		
-		public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
-			this.dataFile=dataFile;
-			this.first=write;
-			size+=write.location.getSize();
-		}
-		
-		public boolean canAppend(DataFile dataFile, WriteCommand write) {
-			if( dataFile != this.dataFile ) 
-				return false;
-			if( size+write.location.getSize() >= MAX_WRITE_BATCH_SIZE )
-				return false;
-			return true;
-		}
-		
-		public void append(WriteCommand write) throws IOException {
-			this.first.getTailNode().linkAfter(write);
-			size+=write.location.getSize();
-		}
-	}
-	
-    public static class WriteCommand extends LinkedNode {    	
-		public final Location location;
-		public final ByteSequence data;
-		final boolean sync;
-		
-		public WriteCommand(Location location, ByteSequence data, boolean sync) {
-			this.location = location;
-			this.data = data;
-			this.sync = sync;
-		}
     }
-    
-    protected final AsyncDataManager dataManager;    
-    
+
+    public class WriteBatch {
+
+        public final DataFile dataFile;
+        public final WriteCommand first;
+        public final CountDownLatch latch = new CountDownLatch(1);
+        public int size;
+
+        public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
+            this.dataFile = dataFile;
+            this.first = write;
+            size += write.location.getSize();
+        }
+
+        public boolean canAppend(DataFile dataFile, WriteCommand write) {
+            if (dataFile != this.dataFile)
+                return false;
+            if (size + write.location.getSize() >= MAX_WRITE_BATCH_SIZE)
+                return false;
+            return true;
+        }
+
+        public void append(WriteCommand write) throws IOException {
+            this.first.getTailNode().linkAfter(write);
+            size += write.location.getSize();
+        }
+    }
+
+    public static class WriteCommand extends LinkedNode {
+        public final Location location;
+        public final ByteSequence data;
+        final boolean sync;
+
+        public WriteCommand(Location location, ByteSequence data, boolean sync) {
+            this.location = location;
+            this.data = data;
+            this.sync = sync;
+        }
+    }
+
+    protected final AsyncDataManager dataManager;
+
     protected final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites;
-    
+
     protected final Object enqueueMutex = new Object();
-    protected WriteBatch nextWriteBatch; 
-    
+    protected WriteBatch nextWriteBatch;
+
     private boolean running;
     protected boolean shutdown;
     protected IOException firstAsyncException;
     protected final CountDownLatch shutdownDone = new CountDownLatch(1);
-	private Thread thread;
-    
+    private Thread thread;
+
     /**
      * Construct a Store writer
      * 
      * @param fileId
      */
-    public DataFileAppender(AsyncDataManager dataManager){
-        this.dataManager=dataManager;
+    public DataFileAppender(AsyncDataManager dataManager) {
+        this.dataManager = dataManager;
         this.inflightWrites = this.dataManager.getInflightWrites();
     }
-    
+
     /**
-     * @param type 
+     * @param type
      * @param marshaller
      * @param payload
-     * @param type 
-     * @param sync 
+     * @param type
+     * @param sync
      * @return
      * @throws IOException
-     * @throws  
-     * @throws  
+     * @throws
+     * @throws
      */
     public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
-    	        
+
         // Write the packet our internal buffer.
-    	int size = data.getLength()+AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
-    	
-        final Location location=new Location();
-        location.setSize(size);   
+        int size = data.getLength() + AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
+
+        final Location location = new Location();
+        location.setSize(size);
         location.setType(type);
-        
+
         WriteBatch batch;
-    	WriteCommand write = new WriteCommand(location, data, sync);
+        WriteCommand write = new WriteCommand(location, data, sync);
 
-    	// Locate datafile and enqueue into the executor in sychronized block so that 
-        // writes get equeued onto the executor in order that they were assigned by 
+        // Locate datafile and enqueue into the executor in sychronized block so
+        // that
+        // writes get equeued onto the executor in order that they were assigned
+        // by
         // the data manager (which is basically just appending)
-    	
-        synchronized(this) {
+
+        synchronized (this) {
             // Find the position where this item will land at.
-	        DataFile dataFile=dataManager.allocateLocation(location);
-        	batch = enqueue(dataFile, write);
+            DataFile dataFile = dataManager.allocateLocation(location);
+            batch = enqueue(dataFile, write);
         }
         location.setLatch(batch.latch);
-    	if( sync ) {
-    		try {
-    			batch.latch.await();
-			} catch (InterruptedException e) {
-				throw new InterruptedIOException();
-			}
-    	} else {
+        if (sync) {
+            try {
+                batch.latch.await();
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
+        } else {
             inflightWrites.put(new WriteKey(location), write);
-    	}
-    	
+        }
+
         return location;
     }
 
     private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
-    	synchronized(enqueueMutex) {
-        	WriteBatch rc=null;
-        	if( shutdown ) {
-        		throw new IOException("Async Writter Thread Shutdown");
-        	}
-        	if( firstAsyncException !=null )
-        		throw firstAsyncException;
-        	
-        	if( !running ) {
-        		running=true;
-        		thread = new Thread() {
-        			public void run() {
-        				processQueue();
-        			}
-        		};
-        		thread.setPriority(Thread.MAX_PRIORITY);
-        		thread.setDaemon(true);
-        		thread.setName("ActiveMQ Data File Writer");
-        		thread.start();
-        	}
-        	
-        	if( nextWriteBatch == null ) {
-        		nextWriteBatch = new WriteBatch(dataFile,write);
-        		rc = nextWriteBatch;
-	    		enqueueMutex.notify();
-        	} else {
-        		// Append to current batch if possible..
-        		if( nextWriteBatch.canAppend(dataFile, write) ) {
-        			nextWriteBatch.append(write);
-            		rc = nextWriteBatch;
-        		} else {
-            		// Otherwise wait for the queuedCommand to be null 
-        	    	try {
-        		    	while( nextWriteBatch!=null ) {
-        		    		enqueueMutex.wait();
-        		    	}    			
-    				} catch (InterruptedException e) {
-    					throw new InterruptedIOException();
-    				}
-    	        	if( shutdown ) {
-    	        		throw new IOException("Async Writter Thread Shutdown");
-    	        	}
-    	        	
-        	    	// Start a new batch.
-            		nextWriteBatch = new WriteBatch(dataFile,write);
-            		rc = nextWriteBatch;
-    	    		enqueueMutex.notify();
-        		}
-        	}
-        	return rc;
-    	}
-	}
+        synchronized (enqueueMutex) {
+            WriteBatch rc = null;
+            if (shutdown) {
+                throw new IOException("Async Writter Thread Shutdown");
+            }
+            if (firstAsyncException != null)
+                throw firstAsyncException;
+
+            if (!running) {
+                running = true;
+                thread = new Thread() {
+                    public void run() {
+                        processQueue();
+                    }
+                };
+                thread.setPriority(Thread.MAX_PRIORITY);
+                thread.setDaemon(true);
+                thread.setName("ActiveMQ Data File Writer");
+                thread.start();
+            }
+
+            if (nextWriteBatch == null) {
+                nextWriteBatch = new WriteBatch(dataFile, write);
+                rc = nextWriteBatch;
+                enqueueMutex.notify();
+            } else {
+                // Append to current batch if possible..
+                if (nextWriteBatch.canAppend(dataFile, write)) {
+                    nextWriteBatch.append(write);
+                    rc = nextWriteBatch;
+                } else {
+                    // Otherwise wait for the queuedCommand to be null
+                    try {
+                        while (nextWriteBatch != null) {
+                            enqueueMutex.wait();
+                        }
+                    } catch (InterruptedException e) {
+                        throw new InterruptedIOException();
+                    }
+                    if (shutdown) {
+                        throw new IOException("Async Writter Thread Shutdown");
+                    }
+
+                    // Start a new batch.
+                    nextWriteBatch = new WriteBatch(dataFile, write);
+                    rc = nextWriteBatch;
+                    enqueueMutex.notify();
+                }
+            }
+            return rc;
+        }
+    }
 
     public void close() throws IOException {
-    	synchronized( enqueueMutex ) {
-    		if( shutdown == false ) {
-	    		shutdown = true;
-	    		if( running ) {
-    	    		enqueueMutex.notifyAll();
-	    		} else {
-	    			shutdownDone.countDown();
-	    		}
-    		}
-    	}
-    	
-    	try {
-			shutdownDone.await();
-		} catch (InterruptedException e) {
-			throw new InterruptedIOException();
-		}
-    	
+        synchronized (enqueueMutex) {
+            if (shutdown == false) {
+                shutdown = true;
+                if (running) {
+                    enqueueMutex.notifyAll();
+                } else {
+                    shutdownDone.countDown();
+                }
+            }
+        }
+
+        try {
+            shutdownDone.await();
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+        }
+
     }
 
     /**
-     * The async processing loop that writes to the data files and
-     * does the force calls.  
+     * The async processing loop that writes to the data files and does the
+     * force calls.
      * 
-     * Since the file sync() call is the slowest of all the operations, 
-     * this algorithm tries to 'batch' or group together several file sync() requests 
-     * into a single file sync() call. The batching is accomplished attaching the 
-     * same CountDownLatch instance to every force request in a group.
+     * Since the file sync() call is the slowest of all the operations, this
+     * algorithm tries to 'batch' or group together several file sync() requests
+     * into a single file sync() call. The batching is accomplished attaching
+     * the same CountDownLatch instance to every force request in a group.
      * 
      */
     protected void processQueue() {
-		DataFile dataFile=null;
-		RandomAccessFile file=null;
-    	try {
-    		
-    		DataByteArrayOutputStream buff = new DataByteArrayOutputStream(MAX_WRITE_BATCH_SIZE);
-	    	while( true ) {
-	    		
-	    		Object o = null;
-
-	    		// Block till we get a command.
-	    		synchronized(enqueueMutex) {
-	    			while( true ) {
-	    				if( shutdown ) {
-	    					o = SHUTDOWN_COMMAND;
-	    					break;
-	    				}
-	    				if( nextWriteBatch!=null ) {
-	    					o = nextWriteBatch;
-	    					nextWriteBatch=null;
-	    					break;
-	    				}
-	    				enqueueMutex.wait();
-	    			}
-	    			enqueueMutex.notify();
-	            }        
-	    		
-	    		
-	        	if( o == SHUTDOWN_COMMAND ) {
-	        		break;
-	        	} 
-	        	
-	        	WriteBatch wb = (WriteBatch) o;
-				if( dataFile != wb.dataFile ) {
-	        		if( file!=null ) {
-	        			dataFile.closeRandomAccessFile(file);
-	        		}
-	        		dataFile = wb.dataFile;
-	        		file = dataFile.openRandomAccessFile(true);
-	        	}
-	        	
-	        	WriteCommand write = wb.first;
-	        	
-	        	// Write all the data.
-				// Only need to seek to first location.. all others 
-				// are in sequence.
-	        	file.seek(write.location.getOffset());
-	        	
-	        	// 
-        		// is it just 1 big write?
-	        	if( wb.size == write.location.getSize() ) {
-	        		
-	        		// Just write it directly..
-		        	file.writeInt(write.location.getSize());
-		        	file.writeByte(write.location.getType());
-		        	file.write(RESERVED_SPACE);
-		        	file.write(AsyncDataManager.ITEM_HEAD_SOR);	        		
-		        	file.write(write.data.getData(),write.data.getOffset(), write.data.getLength());
-		        	file.write(AsyncDataManager.ITEM_HEAD_EOR);
-		        	
-	        	} else {
-	        		
-	        		// Combine the smaller writes into 1 big buffer
-		        	while( write!=null ) {
-	
-		        		buff.writeInt(write.location.getSize());
-		        		buff.writeByte(write.location.getType());
-		        		buff.write(RESERVED_SPACE);
-		        		buff.write(AsyncDataManager.ITEM_HEAD_SOR);	        		
-		        		buff.write(write.data.getData(),write.data.getOffset(), write.data.getLength());
-		        		buff.write(AsyncDataManager.ITEM_HEAD_EOR);
-		        		
-		        		write = (WriteCommand) write.getNext();
-		        	}
-		        	
-		        	// Now do the 1 big write.
-		        	ByteSequence sequence = buff.toByteSequence();
-		        	file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
-		        	buff.reset();
-	        	}
-	        	
-    			file.getFD().sync();
-    			
-    			WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode();
-    			dataManager.setLastAppendLocation( lastWrite.location );
-    			
-    			// Signal any waiting threads that the write is on disk.
-				wb.latch.countDown();
-    			
-    			// Now that the data is on disk, remove the writes from the in flight
-    			// cache.
-	        	write = wb.first;
-	        	while( write!=null ) {
-	        		if( !write.sync ) {
-	        			inflightWrites.remove(new WriteKey(write.location));
-	        		}
-	        		write = (WriteCommand) write.getNext();
-	        	}
-	    	}
-	    	buff.close();
-		} catch (IOException e) {
-	    	synchronized( enqueueMutex ) {
-	    		firstAsyncException = e;
-	    	}
-		} catch (InterruptedException e) {
-		} finally {
-    		try {
-				if( file!=null ) {
-					dataFile.closeRandomAccessFile(file);
-				}
-			} catch (IOException e) {
-			}
-    		shutdownDone.countDown();
-    	}
+        DataFile dataFile = null;
+        RandomAccessFile file = null;
+        try {
+
+            DataByteArrayOutputStream buff = new DataByteArrayOutputStream(MAX_WRITE_BATCH_SIZE);
+            while (true) {
+
+                Object o = null;
+
+                // Block till we get a command.
+                synchronized (enqueueMutex) {
+                    while (true) {
+                        if (shutdown) {
+                            o = SHUTDOWN_COMMAND;
+                            break;
+                        }
+                        if (nextWriteBatch != null) {
+                            o = nextWriteBatch;
+                            nextWriteBatch = null;
+                            break;
+                        }
+                        enqueueMutex.wait();
+                    }
+                    enqueueMutex.notify();
+                }
+
+                if (o == SHUTDOWN_COMMAND) {
+                    break;
+                }
+
+                WriteBatch wb = (WriteBatch)o;
+                if (dataFile != wb.dataFile) {
+                    if (file != null) {
+                        dataFile.closeRandomAccessFile(file);
+                    }
+                    dataFile = wb.dataFile;
+                    file = dataFile.openRandomAccessFile(true);
+                }
+
+                WriteCommand write = wb.first;
+
+                // Write all the data.
+                // Only need to seek to first location.. all others
+                // are in sequence.
+                file.seek(write.location.getOffset());
+
+                // 
+                // is it just 1 big write?
+                if (wb.size == write.location.getSize()) {
+
+                    // Just write it directly..
+                    file.writeInt(write.location.getSize());
+                    file.writeByte(write.location.getType());
+                    file.write(RESERVED_SPACE);
+                    file.write(AsyncDataManager.ITEM_HEAD_SOR);
+                    file.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+                    file.write(AsyncDataManager.ITEM_HEAD_EOR);
+
+                } else {
+
+                    // Combine the smaller writes into 1 big buffer
+                    while (write != null) {
+
+                        buff.writeInt(write.location.getSize());
+                        buff.writeByte(write.location.getType());
+                        buff.write(RESERVED_SPACE);
+                        buff.write(AsyncDataManager.ITEM_HEAD_SOR);
+                        buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+                        buff.write(AsyncDataManager.ITEM_HEAD_EOR);
+
+                        write = (WriteCommand)write.getNext();
+                    }
+
+                    // Now do the 1 big write.
+                    ByteSequence sequence = buff.toByteSequence();
+                    file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+                    buff.reset();
+                }
+
+                file.getFD().sync();
+
+                WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+                dataManager.setLastAppendLocation(lastWrite.location);
+
+                // Signal any waiting threads that the write is on disk.
+                wb.latch.countDown();
+
+                // Now that the data is on disk, remove the writes from the in
+                // flight
+                // cache.
+                write = wb.first;
+                while (write != null) {
+                    if (!write.sync) {
+                        inflightWrites.remove(new WriteKey(write.location));
+                    }
+                    write = (WriteCommand)write.getNext();
+                }
+            }
+            buff.close();
+        } catch (IOException e) {
+            synchronized (enqueueMutex) {
+                firstAsyncException = e;
+            }
+        } catch (InterruptedException e) {
+        } finally {
+            try {
+                if (file != null) {
+                    dataFile.closeRandomAccessFile(file);
+                }
+            } catch (IOException e) {
+            }
+            shutdownDone.countDown();
+        }
     }
-        
+
 }