You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/08 15:20:38 UTC

svn commit: r516048 [10/14] - in /activemq/trunk: activemq-book/src/docbkx/ activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/main/java/org/apache/activemq/blob/ activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ ac...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Thu Mar  8 06:20:29 2007
@@ -1,277 +1,277 @@
-/**
- * 
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.activemq.store.kahadaptor;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.kaha.ListContainer;
-import org.apache.activemq.kaha.MapContainer;
-import org.apache.activemq.kaha.Marshaller;
-import org.apache.activemq.kaha.Store;
-import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.TopicReferenceStore;
-
-public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore{
-
-    protected ListContainer<TopicSubAck> ackContainer;
-    private Map subscriberContainer;
-    private Store store;
-    protected Map subscriberMessages=new ConcurrentHashMap();
-
-    public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter adapter,MapContainer messageContainer,ListContainer ackContainer,
-            MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
-        super(adapter,messageContainer,destination);
-        this.store=store;
-        this.ackContainer=ackContainer;
-        subscriberContainer=subsContainer;
-        // load all the Ack containers
-        for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
-            Object key=i.next();
-            addSubscriberMessageContainer(key);
-        }
-    }
-
-    protected MessageId getMessageId(Object object){
-        return new MessageId(((ReferenceRecord)object).getMessageId());
-    }
-
-    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
-        throw new RuntimeException("Use addMessageReference instead");
-    }
-
-    public synchronized Message getMessage(MessageId identity) throws IOException{
-        throw new RuntimeException("Use addMessageReference instead");
-    }
-
-    protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
-        ReferenceRecord record=(ReferenceRecord)msg;
-        listener.recoverMessageReference(new MessageId(record.getMessageId()));
-    }
-
-    public void addMessageReference(final ConnectionContext context,final MessageId messageId,final ReferenceData data){
-        final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
-        final int subscriberCount=subscriberMessages.size();
-        if(subscriberCount>0){
-            final StoreEntry messageEntry=messageContainer.place(messageId,record);
-            addInterest(record);
-            final TopicSubAck tsa=new TopicSubAck();
-            tsa.setCount(subscriberCount);
-            tsa.setMessageEntry(messageEntry);
-            final StoreEntry ackEntry=ackContainer.placeLast(tsa);
-            for(final Iterator i=subscriberMessages.values().iterator();i.hasNext();){
-                final TopicSubContainer container=(TopicSubContainer)i.next();
-                final ConsumerMessageRef ref=new ConsumerMessageRef();
-                ref.setAckEntry(ackEntry);
-                ref.setMessageEntry(messageEntry);
-                ref.setMessageId(messageId);
-                container.add(ref);
-            }
-        }
-    }
-
-    public ReferenceData getMessageReference(final MessageId identity) throws IOException{
-        final ReferenceRecord result=messageContainer.get(identity);
-        if(result==null)
-            return null;
-        return result.getData();
-    }
-
-    public void addReferenceFileIdsInUse(){
-        for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
-            TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
-            if(subAck.getCount()>0){
-                ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
-                addInterest(rr);
-            }
-        }
-    }
-
-    protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
-        ListContainer container=store.getListContainer(key,"topic-subs-references");
-        Marshaller marshaller=new ConsumerMessageRefMarshaller();
-        container.setMarshaller(marshaller);
-        TopicSubContainer tsc=new TopicSubContainer(container);
-        subscriberMessages.put(key,tsc);
-        return container;
-    }
-
-    public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
-            MessageId messageId) throws IOException{
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        if(container!=null){
-            ConsumerMessageRef ref=container.remove(messageId);
-            if(ref!=null){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
-                if(tsa!=null){
-                    if(tsa.decrementCount()<=0){
-                        StoreEntry entry=ref.getAckEntry();
-                        entry=ackContainer.refresh(entry);
-                        ackContainer.remove(entry);
-                        ReferenceRecord rr=messageContainer.get(messageId);
-                        if(rr!=null){
-                            entry=tsa.getMessageEntry();
-                            entry=messageContainer.refresh(entry);
-                            messageContainer.remove(entry);
-                            removeInterest(rr);
-                        }
-                    }else{
-                       
-                        ackContainer.update(ref.getAckEntry(),tsa);
-                    }
-                }
-            }
-        }
-    }
-
-    public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
-            throws IOException{
-        SubscriptionInfo info=new SubscriptionInfo();
-        info.setDestination(destination);
-        info.setClientId(clientId);
-        info.setSelector(selector);
-        info.setSubcriptionName(subscriptionName);
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        // if already exists - won't add it again as it causes data files
-        // to hang around
-        if(!subscriberContainer.containsKey(key)){
-            subscriberContainer.put(key,info);
-        }
-        // add the subscriber
-        ListContainer container=addSubscriberMessageContainer(key);
-        if(retroactive){
-            /*
-            for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
-                ConsumerMessageRef ref=new ConsumerMessageRef();
-                ref.setAckEntry(entry);
-                ref.setMessageEntry(tsa.getMessageEntry());
-                container.add(ref);
-            }
-            */
-        }
-    }
-
-    public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        removeSubscriberMessageContainer(key);
-    }
-
-    public SubscriptionInfo[] getAllSubscriptions() throws IOException{
-        return (SubscriptionInfo[])subscriberContainer.values().toArray(
-                new SubscriptionInfo[subscriberContainer.size()]);
-    }
-
-    public int getMessageCount(String clientId,String subscriberName) throws IOException{
-        String key=getSubscriptionKey(clientId,subscriberName);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        return  container != null ? container.size() : 0;
-    }
-
-    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
-        return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
-    }
-
-    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
-            MessageRecoveryListener listener) throws Exception{
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        if(container!=null){
-            int count=0;
-            StoreEntry entry=container.getBatchEntry();
-            if(entry==null){
-                entry=container.getEntry();
-            }else{
-                entry=container.refreshEntry(entry);
-                if(entry!=null){
-                    entry=container.getNextEntry(entry);
-                }
-            }
-            if(entry!=null){
-                do{
-                    ConsumerMessageRef consumerRef=container.get(entry);
-                    Object msg=messageContainer.getValue(consumerRef.getMessageEntry());
-                    if(msg!=null){
-                        recover(listener,msg);
-                        count++;
-                    }
-                    container.setBatchEntry(entry);
-                    entry=container.getNextEntry(entry);
-                }while(entry!=null&&count<maxReturned&&listener.hasSpace());
-            }
-        }
-        listener.finished();
-    }
-
-    public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
-            throws Exception{
-        
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        if(container!=null){
-            for(Iterator i=container.iterator();i.hasNext();){
-                ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
-                Object msg=messageContainer.get(ref.getMessageEntry());
-                if(msg!=null){
-                    recover(listener,msg);
-                }
-            }
-        }
-        listener.finished();
-    }
-
-    public synchronized void resetBatching(String clientId,String subscriptionName){
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
-        if(topicSubContainer!=null){
-            topicSubContainer.reset();
-        }
-    }
-
-    protected void removeSubscriberMessageContainer(Object key) throws IOException{
-        subscriberContainer.remove(key);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
-        for(Iterator i=container.iterator();i.hasNext();){
-            ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
-            if(ref!=null){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
-                if(tsa!=null){
-                    if(tsa.decrementCount()<=0){
-                        ackContainer.remove(ref.getAckEntry());
-                        messageContainer.remove(tsa.getMessageEntry());
-                    }else{
-                        ackContainer.update(ref.getAckEntry(),tsa);
-                    }
-                }
-            }
-        }
-        store.deleteListContainer(key,"topic-subs-references");
-    }
-
-    protected String getSubscriptionKey(String clientId,String subscriberName){
-        String result=clientId+":";
-        result+=subscriberName!=null?subscriberName:"NOT_SET";
-        return result;
-    }
-}
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicReferenceStore;
+
+public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore{
+
+    protected ListContainer<TopicSubAck> ackContainer;
+    private Map subscriberContainer;
+    private Store store;
+    protected Map subscriberMessages=new ConcurrentHashMap();
+
+    public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter adapter,MapContainer messageContainer,ListContainer ackContainer,
+            MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
+        super(adapter,messageContainer,destination);
+        this.store=store;
+        this.ackContainer=ackContainer;
+        subscriberContainer=subsContainer;
+        // load all the Ack containers
+        for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
+            Object key=i.next();
+            addSubscriberMessageContainer(key);
+        }
+    }
+
+    protected MessageId getMessageId(Object object){
+        return new MessageId(((ReferenceRecord)object).getMessageId());
+    }
+
+    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+        throw new RuntimeException("Use addMessageReference instead");
+    }
+
+    public synchronized Message getMessage(MessageId identity) throws IOException{
+        throw new RuntimeException("Use addMessageReference instead");
+    }
+
+    protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
+        ReferenceRecord record=(ReferenceRecord)msg;
+        listener.recoverMessageReference(new MessageId(record.getMessageId()));
+    }
+
+    public void addMessageReference(final ConnectionContext context,final MessageId messageId,final ReferenceData data){
+        final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
+        final int subscriberCount=subscriberMessages.size();
+        if(subscriberCount>0){
+            final StoreEntry messageEntry=messageContainer.place(messageId,record);
+            addInterest(record);
+            final TopicSubAck tsa=new TopicSubAck();
+            tsa.setCount(subscriberCount);
+            tsa.setMessageEntry(messageEntry);
+            final StoreEntry ackEntry=ackContainer.placeLast(tsa);
+            for(final Iterator i=subscriberMessages.values().iterator();i.hasNext();){
+                final TopicSubContainer container=(TopicSubContainer)i.next();
+                final ConsumerMessageRef ref=new ConsumerMessageRef();
+                ref.setAckEntry(ackEntry);
+                ref.setMessageEntry(messageEntry);
+                ref.setMessageId(messageId);
+                container.add(ref);
+            }
+        }
+    }
+
+    public ReferenceData getMessageReference(final MessageId identity) throws IOException{
+        final ReferenceRecord result=messageContainer.get(identity);
+        if(result==null)
+            return null;
+        return result.getData();
+    }
+
+    public void addReferenceFileIdsInUse(){
+        for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
+            TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
+            if(subAck.getCount()>0){
+                ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
+                addInterest(rr);
+            }
+        }
+    }
+
+    protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
+        ListContainer container=store.getListContainer(key,"topic-subs-references");
+        Marshaller marshaller=new ConsumerMessageRefMarshaller();
+        container.setMarshaller(marshaller);
+        TopicSubContainer tsc=new TopicSubContainer(container);
+        subscriberMessages.put(key,tsc);
+        return container;
+    }
+
+    public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
+            MessageId messageId) throws IOException{
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+        if(container!=null){
+            ConsumerMessageRef ref=container.remove(messageId);
+            if(ref!=null){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
+                if(tsa!=null){
+                    if(tsa.decrementCount()<=0){
+                        StoreEntry entry=ref.getAckEntry();
+                        entry=ackContainer.refresh(entry);
+                        ackContainer.remove(entry);
+                        ReferenceRecord rr=messageContainer.get(messageId);
+                        if(rr!=null){
+                            entry=tsa.getMessageEntry();
+                            entry=messageContainer.refresh(entry);
+                            messageContainer.remove(entry);
+                            removeInterest(rr);
+                        }
+                    }else{
+                       
+                        ackContainer.update(ref.getAckEntry(),tsa);
+                    }
+                }
+            }
+        }
+    }
+
+    public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+            throws IOException{
+        SubscriptionInfo info=new SubscriptionInfo();
+        info.setDestination(destination);
+        info.setClientId(clientId);
+        info.setSelector(selector);
+        info.setSubcriptionName(subscriptionName);
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        // if already exists - won't add it again as it causes data files
+        // to hang around
+        if(!subscriberContainer.containsKey(key)){
+            subscriberContainer.put(key,info);
+        }
+        // add the subscriber
+        ListContainer container=addSubscriberMessageContainer(key);
+        if(retroactive){
+            /*
+            for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
+                ConsumerMessageRef ref=new ConsumerMessageRef();
+                ref.setAckEntry(entry);
+                ref.setMessageEntry(tsa.getMessageEntry());
+                container.add(ref);
+            }
+            */
+        }
+    }
+
+    public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        removeSubscriberMessageContainer(key);
+    }
+
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+        return (SubscriptionInfo[])subscriberContainer.values().toArray(
+                new SubscriptionInfo[subscriberContainer.size()]);
+    }
+
+    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+        String key=getSubscriptionKey(clientId,subscriberName);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+        return  container != null ? container.size() : 0;
+    }
+
+    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
+        return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
+    }
+
+    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
+            MessageRecoveryListener listener) throws Exception{
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+        if(container!=null){
+            int count=0;
+            StoreEntry entry=container.getBatchEntry();
+            if(entry==null){
+                entry=container.getEntry();
+            }else{
+                entry=container.refreshEntry(entry);
+                if(entry!=null){
+                    entry=container.getNextEntry(entry);
+                }
+            }
+            if(entry!=null){
+                do{
+                    ConsumerMessageRef consumerRef=container.get(entry);
+                    Object msg=messageContainer.getValue(consumerRef.getMessageEntry());
+                    if(msg!=null){
+                        recover(listener,msg);
+                        count++;
+                    }
+                    container.setBatchEntry(entry);
+                    entry=container.getNextEntry(entry);
+                }while(entry!=null&&count<maxReturned&&listener.hasSpace());
+            }
+        }
+        listener.finished();
+    }
+
+    public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
+            throws Exception{
+        
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+        if(container!=null){
+            for(Iterator i=container.iterator();i.hasNext();){
+                ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
+                Object msg=messageContainer.get(ref.getMessageEntry());
+                if(msg!=null){
+                    recover(listener,msg);
+                }
+            }
+        }
+        listener.finished();
+    }
+
+    public synchronized void resetBatching(String clientId,String subscriptionName){
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
+        if(topicSubContainer!=null){
+            topicSubContainer.reset();
+        }
+    }
+
+    protected void removeSubscriberMessageContainer(Object key) throws IOException{
+        subscriberContainer.remove(key);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
+        for(Iterator i=container.iterator();i.hasNext();){
+            ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
+            if(ref!=null){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
+                if(tsa!=null){
+                    if(tsa.decrementCount()<=0){
+                        ackContainer.remove(ref.getAckEntry());
+                        messageContainer.remove(tsa.getMessageEntry());
+                    }else{
+                        ackContainer.update(ref.getAckEntry(),tsa);
+                    }
+                }
+            }
+        }
+        store.deleteListContainer(key,"topic-subs-references");
+    }
+
+    protected String getSubscriptionKey(String clientId,String subscriberName){
+        String result=clientId+":";
+        result+=subscriberName!=null?subscriberName:"NOT_SET";
+        return result;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java Thu Mar  8 06:20:29 2007
@@ -1,74 +1,74 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-/**
- * An optimized buffered input stream for Tcp
- * 
- * @version $Revision: 1.1.1.1 $
- */
-public class NIOInputStream extends InputStream {
-	
-    protected int count;
-    protected int position;
-	private final ByteBuffer in;
-
-    public NIOInputStream(ByteBuffer in){
-		this.in = in;
-    }
-
-    public int read() throws IOException {
-    	try {
-    		int rc = in.get()& 0xff; 
-    		return rc;
-    	} catch ( BufferUnderflowException e ) {
-    		return -1;
-    	}
-    }
-
-    public int read(byte b[],int off,int len) throws IOException{
-    	if( in.hasRemaining() ) {
-	    	int rc = Math.min(len, in.remaining());
-	    	in.get(b, off, rc);
-	    	return rc;
-    	} else {
-    		return len == 0 ? 0 : -1; 
-    	}
-    }
-
-    public long skip(long n) throws IOException{
-    	int rc = Math.min((int)n, in.remaining());
-    	in.position(in.position()+rc);
-        return rc;
-    }
-
-    public int available() throws IOException{
-        return in.remaining();
-    }
-
-    public boolean markSupported(){
-        return false;
-    }
-
-    public void close() throws IOException{
-    }
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+/**
+ * An optimized buffered input stream for Tcp
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class NIOInputStream extends InputStream {
+	
+    protected int count;
+    protected int position;
+	private final ByteBuffer in;
+
+    public NIOInputStream(ByteBuffer in){
+		this.in = in;
+    }
+
+    public int read() throws IOException {
+    	try {
+    		int rc = in.get()& 0xff; 
+    		return rc;
+    	} catch ( BufferUnderflowException e ) {
+    		return -1;
+    	}
+    }
+
+    public int read(byte b[],int off,int len) throws IOException{
+    	if( in.hasRemaining() ) {
+	    	int rc = Math.min(len, in.remaining());
+	    	in.get(b, off, rc);
+	    	return rc;
+    	} else {
+    		return len == 0 ? 0 : -1; 
+    	}
+    }
+
+    public long skip(long n) throws IOException{
+    	int rc = Math.min((int)n, in.remaining());
+    	in.position(in.position()+rc);
+        return rc;
+    }
+
+    public int available() throws IOException{
+        return in.remaining();
+    }
+
+    public boolean markSupported(){
+        return false;
+    }
+
+    public void close() throws IOException{
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Thu Mar  8 06:20:29 2007
@@ -1,182 +1,182 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-
-/**
- * An optimized buffered outputstream for Tcp
- *
- * @version $Revision: 1.1.1.1 $
- */
-
-public class NIOOutputStream extends OutputStream {
-	
-	private final static int BUFFER_SIZE = 8192;
-
-	private final WritableByteChannel out;
-    private final byte[] buffer;
-	private final ByteBuffer byteBuffer;
-    
-    private int count;
-    private boolean closed;
-
-    /**
-     * Constructor
-     *
-     * @param out
-     */
-    public NIOOutputStream(WritableByteChannel out) {
-        this(out, BUFFER_SIZE);
-    }
-
-    /**
-     * Creates a new buffered output stream to write data to the specified underlying output stream with the specified
-     * buffer size.
-     *
-     * @param out  the underlying output stream.
-     * @param size the buffer size.
-     * @throws IllegalArgumentException if size <= 0.
-     */
-    public NIOOutputStream(WritableByteChannel out, int size) {
-        this.out = out;
-		if (size <= 0) {
-            throw new IllegalArgumentException("Buffer size <= 0");
-        }
-        buffer = new byte[size];
-        byteBuffer = ByteBuffer.wrap(buffer);
-    }
-
-    /**
-     * write a byte on to the stream
-     *
-     * @param b - byte to write
-     * @throws IOException
-     */
-    public void write(int b) throws IOException {
-        checkClosed();
-        if (availableBufferToWrite() < 1) {
-            flush();
-        }
-        buffer[count++] = (byte) b;
-    }
-
-
-    /**
-     * write a byte array to the stream
-     *
-     * @param b   the byte buffer
-     * @param off the offset into the buffer
-     * @param len the length of data to write
-     * @throws IOException
-     */
-    public void write(byte b[], int off, int len) throws IOException {
-        checkClosed();
-        if (availableBufferToWrite() < len) {
-            flush();
-        }
-        if (buffer.length >= len) {
-            System.arraycopy(b, off, buffer, count, len);
-            count += len;
-        }
-        else {
-        	write( ByteBuffer.wrap(b, off, len));
-        }
-    }
-
-	/**
-     * flush the data to the output stream
-     * This doesn't call flush on the underlying outputstream, because
-     * Tcp is particularly efficent at doing this itself ....
-     *
-     * @throws IOException
-     */
-    public void flush() throws IOException {
-        if (count > 0 && out != null) {
-        	byteBuffer.position(0);
-        	byteBuffer.limit(count);
-            write(byteBuffer);
-            count = 0;
-        }
-    }
-
-    /**
-     * close this stream
-     *
-     * @throws IOException
-     */
-    public void close() throws IOException {
-        super.close();
-        closed = true;
-    }
-
-
-    /**
-     * Checks that the stream has not been closed
-     *
-     * @throws IOException
-     */
-    protected void checkClosed() throws IOException {
-        if (closed) {
-            throw new EOFException("Cannot write to the stream any more it has already been closed");
-        }
-    }
-
-    /**
-     * @return the amount free space in the buffer
-     */
-    private int availableBufferToWrite() {
-        return buffer.length - count;
-    }
-    
-    protected void write(ByteBuffer data) throws IOException {
-        int remaining = data.remaining();        
-        int lastRemaining = remaining-1;
-        long delay=1;
-        while( remaining > 0 ) {
-        	
-	        // We may need to do a little bit of sleeping to avoid a busy loop.
-            // Slow down if no data was written out.. 
-	        if( remaining == lastRemaining ) {
-	            try {
-                    // Use exponential rollback to increase sleep time.
-                    Thread.sleep(delay);
-                    delay *= 2;
-                    if( delay > 1000 ) {
-                        delay = 1000;
-                    }
-                } catch (InterruptedException e) {
-                    throw new InterruptedIOException();
-                }                        
-	        } else {
-	            delay = 1;
-	        }        	        
-	        lastRemaining = remaining;
-	        
-            // Since the write is non-blocking, all the data may not have been written.
-            out.write( data );        
-            remaining = data.remaining();        	        
-        }    
-	}
-    
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * An optimized buffered outputstream for Tcp
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+
+public class NIOOutputStream extends OutputStream {
+	
+	private final static int BUFFER_SIZE = 8192;
+
+	private final WritableByteChannel out;
+    private final byte[] buffer;
+	private final ByteBuffer byteBuffer;
+    
+    private int count;
+    private boolean closed;
+
+    /**
+     * Constructor
+     *
+     * @param out
+     */
+    public NIOOutputStream(WritableByteChannel out) {
+        this(out, BUFFER_SIZE);
+    }
+
+    /**
+     * Creates a new buffered output stream to write data to the specified underlying output stream with the specified
+     * buffer size.
+     *
+     * @param out  the underlying output stream.
+     * @param size the buffer size.
+     * @throws IllegalArgumentException if size <= 0.
+     */
+    public NIOOutputStream(WritableByteChannel out, int size) {
+        this.out = out;
+		if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buffer = new byte[size];
+        byteBuffer = ByteBuffer.wrap(buffer);
+    }
+
+    /**
+     * write a byte on to the stream
+     *
+     * @param b - byte to write
+     * @throws IOException
+     */
+    public void write(int b) throws IOException {
+        checkClosed();
+        if (availableBufferToWrite() < 1) {
+            flush();
+        }
+        buffer[count++] = (byte) b;
+    }
+
+
+    /**
+     * write a byte array to the stream
+     *
+     * @param b   the byte buffer
+     * @param off the offset into the buffer
+     * @param len the length of data to write
+     * @throws IOException
+     */
+    public void write(byte b[], int off, int len) throws IOException {
+        checkClosed();
+        if (availableBufferToWrite() < len) {
+            flush();
+        }
+        if (buffer.length >= len) {
+            System.arraycopy(b, off, buffer, count, len);
+            count += len;
+        }
+        else {
+        	write( ByteBuffer.wrap(b, off, len));
+        }
+    }
+
+	/**
+     * flush the data to the output stream
+     * This doesn't call flush on the underlying outputstream, because
+     * Tcp is particularly efficent at doing this itself ....
+     *
+     * @throws IOException
+     */
+    public void flush() throws IOException {
+        if (count > 0 && out != null) {
+        	byteBuffer.position(0);
+        	byteBuffer.limit(count);
+            write(byteBuffer);
+            count = 0;
+        }
+    }
+
+    /**
+     * close this stream
+     *
+     * @throws IOException
+     */
+    public void close() throws IOException {
+        super.close();
+        closed = true;
+    }
+
+
+    /**
+     * Checks that the stream has not been closed
+     *
+     * @throws IOException
+     */
+    protected void checkClosed() throws IOException {
+        if (closed) {
+            throw new EOFException("Cannot write to the stream any more it has already been closed");
+        }
+    }
+
+    /**
+     * @return the amount free space in the buffer
+     */
+    private int availableBufferToWrite() {
+        return buffer.length - count;
+    }
+    
+    protected void write(ByteBuffer data) throws IOException {
+        int remaining = data.remaining();        
+        int lastRemaining = remaining-1;
+        long delay=1;
+        while( remaining > 0 ) {
+        	
+	        // We may need to do a little bit of sleeping to avoid a busy loop.
+            // Slow down if no data was written out.. 
+	        if( remaining == lastRemaining ) {
+	            try {
+                    // Use exponential rollback to increase sleep time.
+                    Thread.sleep(delay);
+                    delay *= 2;
+                    if( delay > 1000 ) {
+                        delay = 1000;
+                    }
+                } catch (InterruptedException e) {
+                    throw new InterruptedIOException();
+                }                        
+	        } else {
+	            delay = 1;
+	        }        	        
+	        lastRemaining = remaining;
+	        
+            // Since the write is non-blocking, all the data may not have been written.
+            out.write( data );        
+            remaining = data.remaining();        	        
+        }    
+	}
+    
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Thu Mar  8 06:20:29 2007
@@ -1,156 +1,156 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.Socket;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
-import javax.net.SocketFactory;
-
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
-
-/**
- * An implementation of the {@link Transport} interface using raw tcp/ip
- * 
- * @version $Revision$
- */
-public class NIOTransport extends TcpTransport {
-
-	//private static final Log log = LogFactory.getLog(NIOTransport.class);
-	private SocketChannel channel;
-	private SelectorSelection selection;
-	private ByteBuffer inputBuffer;
-	private ByteBuffer currentBuffer;
-	private int nextFrameSize;
-
-	public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
-		super(wireFormat, socketFactory, remoteLocation, localLocation);
-	}
-
-	public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
-		super(wireFormat, socket);
-	}
-
-	protected void initializeStreams() throws IOException {
-		channel = socket.getChannel();		
-		channel.configureBlocking(false);
-		
-		// listen for events telling us when the socket is readable.
-		selection = SelectorManager.getInstance().register(channel,
-				new SelectorManager.Listener() {
-					public void onSelect(SelectorSelection selection) {
-						serviceRead();
-					}
-					public void onError(SelectorSelection selection, Throwable error) {
-						if( error instanceof IOException ) {
-							onException((IOException) error);							
-						} else {
-							onException(IOExceptionSupport.create(error));							
-						}
-					}
-				});
-		
-		// Send the data via the channel
-//        inputBuffer = ByteBuffer.allocateDirect(8*1024);
-        inputBuffer = ByteBuffer.allocate(8*1024);
-        currentBuffer = inputBuffer;
-        nextFrameSize=-1;
-        currentBuffer.limit(4);
-        this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16*1024));
-        
-	}
-	
-    private void serviceRead() {
-        try {
-            while( true ) {
-            	
-	
-	            int readSize = channel.read(currentBuffer);
-	            if( readSize == -1 ) {
-					onException(new EOFException());
-	                selection.close();
-	                break;
-	            }
-	            if( readSize==0 ) {
-	                break;
-	            }
-	            
-            	if( currentBuffer.hasRemaining() )
-            		continue;
-
-	            // Are we trying to figure out the size of the next frame?
-	            if( nextFrameSize==-1 ) {
-	            	assert inputBuffer == currentBuffer;
-
-	            	// If the frame is too big to fit in our direct byte buffer,
-	            	// Then allocate a non direct byte buffer of the right size for it.
-	            	inputBuffer.flip();
-	            	nextFrameSize = inputBuffer.getInt()+4;
-	            	if( nextFrameSize > inputBuffer.capacity() ) {
-	            		currentBuffer = ByteBuffer.allocate(nextFrameSize);
-	            		currentBuffer.putInt(nextFrameSize);
-	            	} else {
-	            		inputBuffer.limit(nextFrameSize);	            		
-	            	}
-	            	
-            	} else {
-            		currentBuffer.flip();
-    				
-            		Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
-            		doConsume((Command) command);
-            		
-            		nextFrameSize=-1;
-    				inputBuffer.clear();
-    				inputBuffer.limit(4);
-    				currentBuffer = inputBuffer;
-            	}
-	            
-            }
-            
-        } catch (IOException e) {
-            onException(e);
-        } catch (Throwable e) {
-        	onException(IOExceptionSupport.create(e));
-        }
-    }
-
-
-	protected void doStart() throws Exception {
-        connect();
-        selection.setInterestOps(SelectionKey.OP_READ);
-        selection.enable();
-    }
-
-	protected void doStop(ServiceStopper stopper) throws Exception {
-		selection.disable();
-		super.doStop(stopper);		
-	}
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+
+/**
+ * An implementation of the {@link Transport} interface using raw tcp/ip
+ * 
+ * @version $Revision$
+ */
+public class NIOTransport extends TcpTransport {
+
+	//private static final Log log = LogFactory.getLog(NIOTransport.class);
+	private SocketChannel channel;
+	private SelectorSelection selection;
+	private ByteBuffer inputBuffer;
+	private ByteBuffer currentBuffer;
+	private int nextFrameSize;
+
+	public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+		super(wireFormat, socketFactory, remoteLocation, localLocation);
+	}
+
+	public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
+		super(wireFormat, socket);
+	}
+
+	protected void initializeStreams() throws IOException {
+		channel = socket.getChannel();		
+		channel.configureBlocking(false);
+		
+		// listen for events telling us when the socket is readable.
+		selection = SelectorManager.getInstance().register(channel,
+				new SelectorManager.Listener() {
+					public void onSelect(SelectorSelection selection) {
+						serviceRead();
+					}
+					public void onError(SelectorSelection selection, Throwable error) {
+						if( error instanceof IOException ) {
+							onException((IOException) error);							
+						} else {
+							onException(IOExceptionSupport.create(error));							
+						}
+					}
+				});
+		
+		// Send the data via the channel
+//        inputBuffer = ByteBuffer.allocateDirect(8*1024);
+        inputBuffer = ByteBuffer.allocate(8*1024);
+        currentBuffer = inputBuffer;
+        nextFrameSize=-1;
+        currentBuffer.limit(4);
+        this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16*1024));
+        
+	}
+	
+    private void serviceRead() {
+        try {
+            while( true ) {
+            	
+	
+	            int readSize = channel.read(currentBuffer);
+	            if( readSize == -1 ) {
+					onException(new EOFException());
+	                selection.close();
+	                break;
+	            }
+	            if( readSize==0 ) {
+	                break;
+	            }
+	            
+            	if( currentBuffer.hasRemaining() )
+            		continue;
+
+	            // Are we trying to figure out the size of the next frame?
+	            if( nextFrameSize==-1 ) {
+	            	assert inputBuffer == currentBuffer;
+
+	            	// If the frame is too big to fit in our direct byte buffer,
+	            	// Then allocate a non direct byte buffer of the right size for it.
+	            	inputBuffer.flip();
+	            	nextFrameSize = inputBuffer.getInt()+4;
+	            	if( nextFrameSize > inputBuffer.capacity() ) {
+	            		currentBuffer = ByteBuffer.allocate(nextFrameSize);
+	            		currentBuffer.putInt(nextFrameSize);
+	            	} else {
+	            		inputBuffer.limit(nextFrameSize);	            		
+	            	}
+	            	
+            	} else {
+            		currentBuffer.flip();
+    				
+            		Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
+            		doConsume((Command) command);
+            		
+            		nextFrameSize=-1;
+    				inputBuffer.clear();
+    				inputBuffer.limit(4);
+    				currentBuffer = inputBuffer;
+            	}
+	            
+            }
+            
+        } catch (IOException e) {
+            onException(e);
+        } catch (Throwable e) {
+        	onException(IOExceptionSupport.create(e));
+        }
+    }
+
+
+	protected void doStart() throws Exception {
+        connect();
+        selection.setInterestOps(SelectionKey.OP_READ);
+        selection.enable();
+    }
+
+	protected void doStop(ServiceStopper stopper) throws Exception {
+		selection.disable();
+		super.doStop(stopper);		
+	}
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java Thu Mar  8 06:20:29 2007
@@ -1,110 +1,110 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-import javax.net.ServerSocketFactory;
-import javax.net.SocketFactory;
-
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.transport.tcp.TcpTransportFactory;
-import org.apache.activemq.transport.tcp.TcpTransportServer;
-
-public class NIOTransportFactory extends TcpTransportFactory {
-		
-	protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
-		return new TcpTransportServer(this, location, serverSocketFactory) {
-			protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
-				return new NIOTransport(format,socket);
-			}			
-		};
-	}
-	
-	protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
-		return new NIOTransport(wf, socketFactory, location, localLocation);
-	}
-
-	
-    protected ServerSocketFactory createServerSocketFactory() {
-        return new ServerSocketFactory() {
-			public ServerSocket createServerSocket(int port) throws IOException {
-		        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-		        serverSocketChannel.socket().bind(new InetSocketAddress(port));
-				return serverSocketChannel.socket();
-			}
-			public ServerSocket createServerSocket(int port, int backlog) throws IOException {
-		        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-		        serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog);
-				return serverSocketChannel.socket();
-			}
-			public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
-		        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-		        serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog);
-				return serverSocketChannel.socket();
-			}
-        };
-    }
-    
-    protected SocketFactory createSocketFactory() {
-        return new SocketFactory() {
-
-        	public Socket createSocket() throws IOException {
-		        SocketChannel channel = SocketChannel.open();
-        		return channel.socket();
-        	}
-        	
-			public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
-		        SocketChannel channel = SocketChannel.open();
-		        channel.connect(new InetSocketAddress(host, port));
-				return channel.socket();
-			}
-
-			public Socket createSocket(InetAddress address, int port) throws IOException {
-		        SocketChannel channel = SocketChannel.open();
-		        channel.connect(new InetSocketAddress(address, port));
-				return channel.socket();
-			}
-
-			public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort) throws IOException, UnknownHostException {
-		        SocketChannel channel = SocketChannel.open();
-		        channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
-		        channel.connect(new InetSocketAddress(address, port));
-				return channel.socket();
-			}
-
-			public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int localPort) throws IOException {
-		        SocketChannel channel = SocketChannel.open();
-		        channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
-		        channel.connect(new InetSocketAddress(address, port));
-				return channel.socket();
-			}
-        };
-    }
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+
+public class NIOTransportFactory extends TcpTransportFactory {
+		
+	protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+		return new TcpTransportServer(this, location, serverSocketFactory) {
+			protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+				return new NIOTransport(format,socket);
+			}			
+		};
+	}
+	
+	protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+		return new NIOTransport(wf, socketFactory, location, localLocation);
+	}
+
+	
+    protected ServerSocketFactory createServerSocketFactory() {
+        return new ServerSocketFactory() {
+			public ServerSocket createServerSocket(int port) throws IOException {
+		        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+		        serverSocketChannel.socket().bind(new InetSocketAddress(port));
+				return serverSocketChannel.socket();
+			}
+			public ServerSocket createServerSocket(int port, int backlog) throws IOException {
+		        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+		        serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog);
+				return serverSocketChannel.socket();
+			}
+			public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
+		        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+		        serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog);
+				return serverSocketChannel.socket();
+			}
+        };
+    }
+    
+    protected SocketFactory createSocketFactory() {
+        return new SocketFactory() {
+
+        	public Socket createSocket() throws IOException {
+		        SocketChannel channel = SocketChannel.open();
+        		return channel.socket();
+        	}
+        	
+			public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
+		        SocketChannel channel = SocketChannel.open();
+		        channel.connect(new InetSocketAddress(host, port));
+				return channel.socket();
+			}
+
+			public Socket createSocket(InetAddress address, int port) throws IOException {
+		        SocketChannel channel = SocketChannel.open();
+		        channel.connect(new InetSocketAddress(address, port));
+				return channel.socket();
+			}
+
+			public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort) throws IOException, UnknownHostException {
+		        SocketChannel channel = SocketChannel.open();
+		        channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
+		        channel.connect(new InetSocketAddress(address, port));
+				return channel.socket();
+			}
+
+			public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int localPort) throws IOException {
+		        SocketChannel channel = SocketChannel.open();
+		        channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
+		        channel.connect(new InetSocketAddress(address, port));
+				return channel.socket();
+			}
+        };
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java Thu Mar  8 06:20:29 2007
@@ -1,109 +1,109 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-import java.util.LinkedList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-/**
- * The SelectorManager will manage one Selector and the thread that checks the
- * selector.
- * 
- * We may need to consider running more than one thread to check the selector if
- * servicing the selector takes too long.
- * 
- * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
- */
-final public class SelectorManager {
-
-	static final public SelectorManager singleton = new SelectorManager();
-	static SelectorManager getInstance() { 
-		return singleton;
-	}
-	
-	public interface Listener {
-		public void onSelect(SelectorSelection selector);
-		public void onError(SelectorSelection selection, Throwable error);
-	}
-	
-	private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory(){
-		public Thread newThread(Runnable r) {
-			Thread rc = new Thread(r);
-			rc.setName("NIO Transport Thread");
-			return rc;
-		}});
-	private Executor channelExecutor = selectorExecutor;
-	private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
-	private int maxChannelsPerWorker = 64;
-	
-	public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
-	 	throws IOException {
-
-		SelectorWorker worker = null;
-		if (freeWorkers.size() > 0) {
-			worker = freeWorkers.getFirst();
-		} else {
-			worker = new SelectorWorker(this);
-			freeWorkers.addFirst(worker);
-		}
-
-		SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);				
-		return selection;
-	}
-
-	synchronized void onWorkerFullEvent(SelectorWorker worker) {
-		freeWorkers.remove(worker);
-	}
-
-	synchronized public void onWorkerEmptyEvent(SelectorWorker worker) {
-		freeWorkers.remove(worker);
-	}
-
-	synchronized public void onWorkerNotFullEvent(SelectorWorker worker) {
-		freeWorkers.add(worker);
-	}
-
-	public Executor getChannelExecutor() {
-		return channelExecutor;
-	}
-
-	public void setChannelExecutor(Executor channelExecutor) {
-		this.channelExecutor = channelExecutor;
-	}
-
-	public int getMaxChannelsPerWorker() {
-		return maxChannelsPerWorker;
-	}
-
-	public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
-		this.maxChannelsPerWorker = maxChannelsPerWorker;
-	}
-
-	public Executor getSelectorExecutor() {
-		return selectorExecutor;
-	}
-
-	public void setSelectorExecutor(Executor selectorExecutor) {
-		this.selectorExecutor = selectorExecutor;
-	}
-
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * The SelectorManager will manage one Selector and the thread that checks the
+ * selector.
+ * 
+ * We may need to consider running more than one thread to check the selector if
+ * servicing the selector takes too long.
+ * 
+ * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
+ */
+final public class SelectorManager {
+
+	static final public SelectorManager singleton = new SelectorManager();
+	static SelectorManager getInstance() { 
+		return singleton;
+	}
+	
+	public interface Listener {
+		public void onSelect(SelectorSelection selector);
+		public void onError(SelectorSelection selection, Throwable error);
+	}
+	
+	private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory(){
+		public Thread newThread(Runnable r) {
+			Thread rc = new Thread(r);
+			rc.setName("NIO Transport Thread");
+			return rc;
+		}});
+	private Executor channelExecutor = selectorExecutor;
+	private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
+	private int maxChannelsPerWorker = 64;
+	
+	public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
+	 	throws IOException {
+
+		SelectorWorker worker = null;
+		if (freeWorkers.size() > 0) {
+			worker = freeWorkers.getFirst();
+		} else {
+			worker = new SelectorWorker(this);
+			freeWorkers.addFirst(worker);
+		}
+
+		SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);				
+		return selection;
+	}
+
+	synchronized void onWorkerFullEvent(SelectorWorker worker) {
+		freeWorkers.remove(worker);
+	}
+
+	synchronized public void onWorkerEmptyEvent(SelectorWorker worker) {
+		freeWorkers.remove(worker);
+	}
+
+	synchronized public void onWorkerNotFullEvent(SelectorWorker worker) {
+		freeWorkers.add(worker);
+	}
+
+	public Executor getChannelExecutor() {
+		return channelExecutor;
+	}
+
+	public void setChannelExecutor(Executor channelExecutor) {
+		this.channelExecutor = channelExecutor;
+	}
+
+	public int getMaxChannelsPerWorker() {
+		return maxChannelsPerWorker;
+	}
+
+	public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
+		this.maxChannelsPerWorker = maxChannelsPerWorker;
+	}
+
+	public Executor getSelectorExecutor() {
+		return selectorExecutor;
+	}
+
+	public void setSelectorExecutor(Executor selectorExecutor) {
+		this.selectorExecutor = selectorExecutor;
+	}
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java Thu Mar  8 06:20:29 2007
@@ -1,72 +1,72 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
-import org.apache.activemq.transport.nio.SelectorManager.Listener;
-
-/**
- * 
- * @author chirino
- */
-final public class SelectorSelection {
-
-	private final SelectorWorker worker;
-	private final SelectionKey key;
-	private final Listener listener;
-	private int interest;
-
-
-	public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
-		this.worker = worker;
-		this.listener = listener;
-		this.key = socketChannel.register(worker.selector, 0, this);
-		worker.incrementUseCounter();
-	}
-
-	public void setInterestOps(int ops) {
-		interest = ops;
-	}
-
-	public void enable() {
-		key.interestOps(interest);				
-		worker.selector.wakeup();
-	}
-
-	public void disable() {
-		key.interestOps(0);
-	}
-
-	public void close() {
-		worker.decrementUseCounter();
-		key.cancel();
-		worker.selector.wakeup();
-	}
-
-	public void onSelect() {
-		listener.onSelect(this);
-	}
-
-	public void onError(Throwable e) {
-		listener.onError(this, e);
-	}
-
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.activemq.transport.nio.SelectorManager.Listener;
+
+/**
+ * 
+ * @author chirino
+ */
+final public class SelectorSelection {
+
+	private final SelectorWorker worker;
+	private final SelectionKey key;
+	private final Listener listener;
+	private int interest;
+
+
+	public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
+		this.worker = worker;
+		this.listener = listener;
+		this.key = socketChannel.register(worker.selector, 0, this);
+		worker.incrementUseCounter();
+	}
+
+	public void setInterestOps(int ops) {
+		interest = ops;
+	}
+
+	public void enable() {
+		key.interestOps(interest);				
+		worker.selector.wakeup();
+	}
+
+	public void disable() {
+		key.interestOps(0);
+	}
+
+	public void close() {
+		worker.decrementUseCounter();
+		key.cancel();
+		worker.selector.wakeup();
+	}
+
+	public void onSelect() {
+		listener.onSelect(this);
+	}
+
+	public void onError(Throwable e) {
+		listener.onError(this, e);
+	}
+
 }

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java Thu Mar  8 06:20:29 2007
@@ -1,130 +1,130 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-public class SelectorWorker implements Runnable {
-
-	private final static AtomicInteger NEXT_ID = new AtomicInteger();
-
-	final SelectorManager manager;
-	final Selector selector;
-	final int id = NEXT_ID.getAndIncrement(); 
-	final AtomicInteger useCounter = new AtomicInteger();
-	final private int maxChannelsPerWorker;
-
-
-	public SelectorWorker(SelectorManager manager) throws IOException {
-		this.manager = manager;
-		selector = Selector.open();
-		maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
-	}
-	
-	void incrementUseCounter() {
-		int use = useCounter.getAndIncrement();
-		if( use == 0 ) {
-			manager.getSelectorExecutor().execute(this);
-		} else if( use+1 == maxChannelsPerWorker ) {
-			manager.onWorkerFullEvent(this);
-		}
-	}
-
-	void decrementUseCounter() {
-		int use = useCounter.getAndDecrement();
-		if (use == 1) {
-			manager.onWorkerEmptyEvent(this);
-		} else if (use == maxChannelsPerWorker ) {
-			manager.onWorkerNotFullEvent(this);
-		}
-	}
-
-	boolean isRunning() {
-		return useCounter.get()!=0;
-	}
-
-	public void run() {
-
-		String origName = Thread.currentThread().getName();
-		try {
-			Thread.currentThread().setName("Selector Worker: " + id);
-			while (isRunning()) {
-
-				int count = selector.select(10);
-				if (count == 0)
-					continue;
-				
-				if (!isRunning())
-					return;
-
-				// Get a java.util.Set containing the SelectionKey objects
-				// for all channels that are ready for I/O.
-				Set keys = selector.selectedKeys();
-
-				for (Iterator i = keys.iterator(); i.hasNext();) {
-					final SelectionKey key = (SelectionKey) i.next();
-					i.remove();
-
-					final SelectorSelection s = (SelectorSelection) key.attachment();
-					try {
-						s.disable();
-						
-						// Kick off another thread to find newly selected keys while we process the 
-						// currently selected keys                
-						manager.getChannelExecutor().execute(new Runnable() {
-							public void run() {
-								try {
-									s.onSelect();
-									s.enable();
-								} catch (Throwable e) {
-									s.onError(e);
-								}
-							}
-						});
-						
-					} catch ( Throwable e ) {
-						s.onError(e);
-					}
-					
-				}
-
-			}
-		} catch (IOException e) {
-			
-			// Don't accept any more slections
-			manager.onWorkerEmptyEvent(this);
-
-			// Notify all the selections that the error occurred.
-			Set keys = selector.keys();
-			for (Iterator i = keys.iterator(); i.hasNext();) {
-				SelectionKey key = (SelectionKey) i.next();
-				SelectorSelection s = (SelectorSelection) key.attachment();
-				s.onError(e);
-			}
-			
-		} finally {
-			Thread.currentThread().setName(origName);
-		}
-	}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class SelectorWorker implements Runnable {
+
+	private final static AtomicInteger NEXT_ID = new AtomicInteger();
+
+	final SelectorManager manager;
+	final Selector selector;
+	final int id = NEXT_ID.getAndIncrement(); 
+	final AtomicInteger useCounter = new AtomicInteger();
+	final private int maxChannelsPerWorker;
+
+
+	public SelectorWorker(SelectorManager manager) throws IOException {
+		this.manager = manager;
+		selector = Selector.open();
+		maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
+	}
+	
+	void incrementUseCounter() {
+		int use = useCounter.getAndIncrement();
+		if( use == 0 ) {
+			manager.getSelectorExecutor().execute(this);
+		} else if( use+1 == maxChannelsPerWorker ) {
+			manager.onWorkerFullEvent(this);
+		}
+	}
+
+	void decrementUseCounter() {
+		int use = useCounter.getAndDecrement();
+		if (use == 1) {
+			manager.onWorkerEmptyEvent(this);
+		} else if (use == maxChannelsPerWorker ) {
+			manager.onWorkerNotFullEvent(this);
+		}
+	}
+
+	boolean isRunning() {
+		return useCounter.get()!=0;
+	}
+
+	public void run() {
+
+		String origName = Thread.currentThread().getName();
+		try {
+			Thread.currentThread().setName("Selector Worker: " + id);
+			while (isRunning()) {
+
+				int count = selector.select(10);
+				if (count == 0)
+					continue;
+				
+				if (!isRunning())
+					return;
+
+				// Get a java.util.Set containing the SelectionKey objects
+				// for all channels that are ready for I/O.
+				Set keys = selector.selectedKeys();
+
+				for (Iterator i = keys.iterator(); i.hasNext();) {
+					final SelectionKey key = (SelectionKey) i.next();
+					i.remove();
+
+					final SelectorSelection s = (SelectorSelection) key.attachment();
+					try {
+						s.disable();
+						
+						// Kick off another thread to find newly selected keys while we process the 
+						// currently selected keys                
+						manager.getChannelExecutor().execute(new Runnable() {
+							public void run() {
+								try {
+									s.onSelect();
+									s.enable();
+								} catch (Throwable e) {
+									s.onError(e);
+								}
+							}
+						});
+						
+					} catch ( Throwable e ) {
+						s.onError(e);
+					}
+					
+				}
+
+			}
+		} catch (IOException e) {
+			
+			// Don't accept any more slections
+			manager.onWorkerEmptyEvent(this);
+
+			// Notify all the selections that the error occurred.
+			Set keys = selector.keys();
+			for (Iterator i = keys.iterator(); i.hasNext();) {
+				SelectionKey key = (SelectionKey) i.next();
+				SelectorSelection s = (SelectorSelection) key.attachment();
+				s.onError(e);
+			}
+			
+		} finally {
+			Thread.currentThread().setName(origName);
+		}
+	}
 }

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TimeStampTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
------------------------------------------------------------------------------
    svn:eol-style = native