You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/08 15:20:38 UTC
svn commit: r516048 [10/14] - in /activemq/trunk: activemq-book/src/docbkx/
activemq-core/src/main/java/org/apache/activemq/
activemq-core/src/main/java/org/apache/activemq/blob/
activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ ac...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Thu Mar 8 06:20:29 2007
@@ -1,277 +1,277 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.activemq.store.kahadaptor;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.kaha.ListContainer;
-import org.apache.activemq.kaha.MapContainer;
-import org.apache.activemq.kaha.Marshaller;
-import org.apache.activemq.kaha.Store;
-import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.TopicReferenceStore;
-
-public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore{
-
- protected ListContainer<TopicSubAck> ackContainer;
- private Map subscriberContainer;
- private Store store;
- protected Map subscriberMessages=new ConcurrentHashMap();
-
- public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter adapter,MapContainer messageContainer,ListContainer ackContainer,
- MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
- super(adapter,messageContainer,destination);
- this.store=store;
- this.ackContainer=ackContainer;
- subscriberContainer=subsContainer;
- // load all the Ack containers
- for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
- Object key=i.next();
- addSubscriberMessageContainer(key);
- }
- }
-
- protected MessageId getMessageId(Object object){
- return new MessageId(((ReferenceRecord)object).getMessageId());
- }
-
- public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
- throw new RuntimeException("Use addMessageReference instead");
- }
-
- public synchronized Message getMessage(MessageId identity) throws IOException{
- throw new RuntimeException("Use addMessageReference instead");
- }
-
- protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
- ReferenceRecord record=(ReferenceRecord)msg;
- listener.recoverMessageReference(new MessageId(record.getMessageId()));
- }
-
- public void addMessageReference(final ConnectionContext context,final MessageId messageId,final ReferenceData data){
- final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
- final int subscriberCount=subscriberMessages.size();
- if(subscriberCount>0){
- final StoreEntry messageEntry=messageContainer.place(messageId,record);
- addInterest(record);
- final TopicSubAck tsa=new TopicSubAck();
- tsa.setCount(subscriberCount);
- tsa.setMessageEntry(messageEntry);
- final StoreEntry ackEntry=ackContainer.placeLast(tsa);
- for(final Iterator i=subscriberMessages.values().iterator();i.hasNext();){
- final TopicSubContainer container=(TopicSubContainer)i.next();
- final ConsumerMessageRef ref=new ConsumerMessageRef();
- ref.setAckEntry(ackEntry);
- ref.setMessageEntry(messageEntry);
- ref.setMessageId(messageId);
- container.add(ref);
- }
- }
- }
-
- public ReferenceData getMessageReference(final MessageId identity) throws IOException{
- final ReferenceRecord result=messageContainer.get(identity);
- if(result==null)
- return null;
- return result.getData();
- }
-
- public void addReferenceFileIdsInUse(){
- for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
- TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
- if(subAck.getCount()>0){
- ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
- addInterest(rr);
- }
- }
- }
-
- protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
- ListContainer container=store.getListContainer(key,"topic-subs-references");
- Marshaller marshaller=new ConsumerMessageRefMarshaller();
- container.setMarshaller(marshaller);
- TopicSubContainer tsc=new TopicSubContainer(container);
- subscriberMessages.put(key,tsc);
- return container;
- }
-
- public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
- MessageId messageId) throws IOException{
- String key=getSubscriptionKey(clientId,subscriptionName);
-
- TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
- if(container!=null){
- ConsumerMessageRef ref=container.remove(messageId);
- if(ref!=null){
- TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
- if(tsa!=null){
- if(tsa.decrementCount()<=0){
- StoreEntry entry=ref.getAckEntry();
- entry=ackContainer.refresh(entry);
- ackContainer.remove(entry);
- ReferenceRecord rr=messageContainer.get(messageId);
- if(rr!=null){
- entry=tsa.getMessageEntry();
- entry=messageContainer.refresh(entry);
- messageContainer.remove(entry);
- removeInterest(rr);
- }
- }else{
-
- ackContainer.update(ref.getAckEntry(),tsa);
- }
- }
- }
- }
- }
-
- public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
- throws IOException{
- SubscriptionInfo info=new SubscriptionInfo();
- info.setDestination(destination);
- info.setClientId(clientId);
- info.setSelector(selector);
- info.setSubcriptionName(subscriptionName);
- String key=getSubscriptionKey(clientId,subscriptionName);
- // if already exists - won't add it again as it causes data files
- // to hang around
- if(!subscriberContainer.containsKey(key)){
- subscriberContainer.put(key,info);
- }
- // add the subscriber
- ListContainer container=addSubscriberMessageContainer(key);
- if(retroactive){
- /*
- for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
- TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
- ConsumerMessageRef ref=new ConsumerMessageRef();
- ref.setAckEntry(entry);
- ref.setMessageEntry(tsa.getMessageEntry());
- container.add(ref);
- }
- */
- }
- }
-
- public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
- String key=getSubscriptionKey(clientId,subscriptionName);
- removeSubscriberMessageContainer(key);
- }
-
- public SubscriptionInfo[] getAllSubscriptions() throws IOException{
- return (SubscriptionInfo[])subscriberContainer.values().toArray(
- new SubscriptionInfo[subscriberContainer.size()]);
- }
-
- public int getMessageCount(String clientId,String subscriberName) throws IOException{
- String key=getSubscriptionKey(clientId,subscriberName);
- TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
- return container != null ? container.size() : 0;
- }
-
- public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
- return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
- }
-
- public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
- MessageRecoveryListener listener) throws Exception{
- String key=getSubscriptionKey(clientId,subscriptionName);
- TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
- if(container!=null){
- int count=0;
- StoreEntry entry=container.getBatchEntry();
- if(entry==null){
- entry=container.getEntry();
- }else{
- entry=container.refreshEntry(entry);
- if(entry!=null){
- entry=container.getNextEntry(entry);
- }
- }
- if(entry!=null){
- do{
- ConsumerMessageRef consumerRef=container.get(entry);
- Object msg=messageContainer.getValue(consumerRef.getMessageEntry());
- if(msg!=null){
- recover(listener,msg);
- count++;
- }
- container.setBatchEntry(entry);
- entry=container.getNextEntry(entry);
- }while(entry!=null&&count<maxReturned&&listener.hasSpace());
- }
- }
- listener.finished();
- }
-
- public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
- throws Exception{
-
- String key=getSubscriptionKey(clientId,subscriptionName);
- TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
- if(container!=null){
- for(Iterator i=container.iterator();i.hasNext();){
- ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
- Object msg=messageContainer.get(ref.getMessageEntry());
- if(msg!=null){
- recover(listener,msg);
- }
- }
- }
- listener.finished();
- }
-
- public synchronized void resetBatching(String clientId,String subscriptionName){
- String key=getSubscriptionKey(clientId,subscriptionName);
- TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
- if(topicSubContainer!=null){
- topicSubContainer.reset();
- }
- }
-
- protected void removeSubscriberMessageContainer(Object key) throws IOException{
- subscriberContainer.remove(key);
- TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
- for(Iterator i=container.iterator();i.hasNext();){
- ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
- if(ref!=null){
- TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
- if(tsa!=null){
- if(tsa.decrementCount()<=0){
- ackContainer.remove(ref.getAckEntry());
- messageContainer.remove(tsa.getMessageEntry());
- }else{
- ackContainer.update(ref.getAckEntry(),tsa);
- }
- }
- }
- }
- store.deleteListContainer(key,"topic-subs-references");
- }
-
- protected String getSubscriptionKey(String clientId,String subscriberName){
- String result=clientId+":";
- result+=subscriberName!=null?subscriberName:"NOT_SET";
- return result;
- }
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicReferenceStore;
+
+public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore{
+
+ protected ListContainer<TopicSubAck> ackContainer;
+ private Map subscriberContainer;
+ private Store store;
+ protected Map subscriberMessages=new ConcurrentHashMap();
+
+ public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter adapter,MapContainer messageContainer,ListContainer ackContainer,
+ MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
+ super(adapter,messageContainer,destination);
+ this.store=store;
+ this.ackContainer=ackContainer;
+ subscriberContainer=subsContainer;
+ // load all the Ack containers
+ for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
+ Object key=i.next();
+ addSubscriberMessageContainer(key);
+ }
+ }
+
+ protected MessageId getMessageId(Object object){
+ return new MessageId(((ReferenceRecord)object).getMessageId());
+ }
+
+ public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+ throw new RuntimeException("Use addMessageReference instead");
+ }
+
+ public synchronized Message getMessage(MessageId identity) throws IOException{
+ throw new RuntimeException("Use addMessageReference instead");
+ }
+
+ protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
+ ReferenceRecord record=(ReferenceRecord)msg;
+ listener.recoverMessageReference(new MessageId(record.getMessageId()));
+ }
+
+ public void addMessageReference(final ConnectionContext context,final MessageId messageId,final ReferenceData data){
+ final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
+ final int subscriberCount=subscriberMessages.size();
+ if(subscriberCount>0){
+ final StoreEntry messageEntry=messageContainer.place(messageId,record);
+ addInterest(record);
+ final TopicSubAck tsa=new TopicSubAck();
+ tsa.setCount(subscriberCount);
+ tsa.setMessageEntry(messageEntry);
+ final StoreEntry ackEntry=ackContainer.placeLast(tsa);
+ for(final Iterator i=subscriberMessages.values().iterator();i.hasNext();){
+ final TopicSubContainer container=(TopicSubContainer)i.next();
+ final ConsumerMessageRef ref=new ConsumerMessageRef();
+ ref.setAckEntry(ackEntry);
+ ref.setMessageEntry(messageEntry);
+ ref.setMessageId(messageId);
+ container.add(ref);
+ }
+ }
+ }
+
+ public ReferenceData getMessageReference(final MessageId identity) throws IOException{
+ final ReferenceRecord result=messageContainer.get(identity);
+ if(result==null)
+ return null;
+ return result.getData();
+ }
+
+ public void addReferenceFileIdsInUse(){
+ for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
+ TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
+ if(subAck.getCount()>0){
+ ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
+ addInterest(rr);
+ }
+ }
+ }
+
+ protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
+ ListContainer container=store.getListContainer(key,"topic-subs-references");
+ Marshaller marshaller=new ConsumerMessageRefMarshaller();
+ container.setMarshaller(marshaller);
+ TopicSubContainer tsc=new TopicSubContainer(container);
+ subscriberMessages.put(key,tsc);
+ return container;
+ }
+
+ public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
+ MessageId messageId) throws IOException{
+ String key=getSubscriptionKey(clientId,subscriptionName);
+
+ TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+ if(container!=null){
+ ConsumerMessageRef ref=container.remove(messageId);
+ if(ref!=null){
+ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
+ if(tsa!=null){
+ if(tsa.decrementCount()<=0){
+ StoreEntry entry=ref.getAckEntry();
+ entry=ackContainer.refresh(entry);
+ ackContainer.remove(entry);
+ ReferenceRecord rr=messageContainer.get(messageId);
+ if(rr!=null){
+ entry=tsa.getMessageEntry();
+ entry=messageContainer.refresh(entry);
+ messageContainer.remove(entry);
+ removeInterest(rr);
+ }
+ }else{
+
+ ackContainer.update(ref.getAckEntry(),tsa);
+ }
+ }
+ }
+ }
+ }
+
+ public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ throws IOException{
+ SubscriptionInfo info=new SubscriptionInfo();
+ info.setDestination(destination);
+ info.setClientId(clientId);
+ info.setSelector(selector);
+ info.setSubcriptionName(subscriptionName);
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ // if already exists - won't add it again as it causes data files
+ // to hang around
+ if(!subscriberContainer.containsKey(key)){
+ subscriberContainer.put(key,info);
+ }
+ // add the subscriber
+ ListContainer container=addSubscriberMessageContainer(key);
+ if(retroactive){
+ /*
+ for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
+ TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
+ ConsumerMessageRef ref=new ConsumerMessageRef();
+ ref.setAckEntry(entry);
+ ref.setMessageEntry(tsa.getMessageEntry());
+ container.add(ref);
+ }
+ */
+ }
+ }
+
+ public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ removeSubscriberMessageContainer(key);
+ }
+
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+ return (SubscriptionInfo[])subscriberContainer.values().toArray(
+ new SubscriptionInfo[subscriberContainer.size()]);
+ }
+
+ public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ String key=getSubscriptionKey(clientId,subscriberName);
+ TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+ return container != null ? container.size() : 0;
+ }
+
+ public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
+ return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
+ }
+
+ public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
+ MessageRecoveryListener listener) throws Exception{
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+ if(container!=null){
+ int count=0;
+ StoreEntry entry=container.getBatchEntry();
+ if(entry==null){
+ entry=container.getEntry();
+ }else{
+ entry=container.refreshEntry(entry);
+ if(entry!=null){
+ entry=container.getNextEntry(entry);
+ }
+ }
+ if(entry!=null){
+ do{
+ ConsumerMessageRef consumerRef=container.get(entry);
+ Object msg=messageContainer.getValue(consumerRef.getMessageEntry());
+ if(msg!=null){
+ recover(listener,msg);
+ count++;
+ }
+ container.setBatchEntry(entry);
+ entry=container.getNextEntry(entry);
+ }while(entry!=null&&count<maxReturned&&listener.hasSpace());
+ }
+ }
+ listener.finished();
+ }
+
+ public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
+ throws Exception{
+
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+ if(container!=null){
+ for(Iterator i=container.iterator();i.hasNext();){
+ ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
+ Object msg=messageContainer.get(ref.getMessageEntry());
+ if(msg!=null){
+ recover(listener,msg);
+ }
+ }
+ }
+ listener.finished();
+ }
+
+ public synchronized void resetBatching(String clientId,String subscriptionName){
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
+ if(topicSubContainer!=null){
+ topicSubContainer.reset();
+ }
+ }
+
+ protected void removeSubscriberMessageContainer(Object key) throws IOException{
+ subscriberContainer.remove(key);
+ TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
+ for(Iterator i=container.iterator();i.hasNext();){
+ ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
+ if(ref!=null){
+ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
+ if(tsa!=null){
+ if(tsa.decrementCount()<=0){
+ ackContainer.remove(ref.getAckEntry());
+ messageContainer.remove(tsa.getMessageEntry());
+ }else{
+ ackContainer.update(ref.getAckEntry(),tsa);
+ }
+ }
+ }
+ }
+ store.deleteListContainer(key,"topic-subs-references");
+ }
+
+ protected String getSubscriptionKey(String clientId,String subscriberName){
+ String result=clientId+":";
+ result+=subscriberName!=null?subscriberName:"NOT_SET";
+ return result;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java Thu Mar 8 06:20:29 2007
@@ -1,74 +1,74 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-/**
- * An optimized buffered input stream for Tcp
- *
- * @version $Revision: 1.1.1.1 $
- */
-public class NIOInputStream extends InputStream {
-
- protected int count;
- protected int position;
- private final ByteBuffer in;
-
- public NIOInputStream(ByteBuffer in){
- this.in = in;
- }
-
- public int read() throws IOException {
- try {
- int rc = in.get()& 0xff;
- return rc;
- } catch ( BufferUnderflowException e ) {
- return -1;
- }
- }
-
- public int read(byte b[],int off,int len) throws IOException{
- if( in.hasRemaining() ) {
- int rc = Math.min(len, in.remaining());
- in.get(b, off, rc);
- return rc;
- } else {
- return len == 0 ? 0 : -1;
- }
- }
-
- public long skip(long n) throws IOException{
- int rc = Math.min((int)n, in.remaining());
- in.position(in.position()+rc);
- return rc;
- }
-
- public int available() throws IOException{
- return in.remaining();
- }
-
- public boolean markSupported(){
- return false;
- }
-
- public void close() throws IOException{
- }
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+/**
+ * An optimized buffered input stream for Tcp
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class NIOInputStream extends InputStream {
+
+ protected int count;
+ protected int position;
+ private final ByteBuffer in;
+
+ public NIOInputStream(ByteBuffer in){
+ this.in = in;
+ }
+
+ public int read() throws IOException {
+ try {
+ int rc = in.get()& 0xff;
+ return rc;
+ } catch ( BufferUnderflowException e ) {
+ return -1;
+ }
+ }
+
+ public int read(byte b[],int off,int len) throws IOException{
+ if( in.hasRemaining() ) {
+ int rc = Math.min(len, in.remaining());
+ in.get(b, off, rc);
+ return rc;
+ } else {
+ return len == 0 ? 0 : -1;
+ }
+ }
+
+ public long skip(long n) throws IOException{
+ int rc = Math.min((int)n, in.remaining());
+ in.position(in.position()+rc);
+ return rc;
+ }
+
+ public int available() throws IOException{
+ return in.remaining();
+ }
+
+ public boolean markSupported(){
+ return false;
+ }
+
+ public void close() throws IOException{
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Thu Mar 8 06:20:29 2007
@@ -1,182 +1,182 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-
-/**
- * An optimized buffered outputstream for Tcp
- *
- * @version $Revision: 1.1.1.1 $
- */
-
-public class NIOOutputStream extends OutputStream {
-
- private final static int BUFFER_SIZE = 8192;
-
- private final WritableByteChannel out;
- private final byte[] buffer;
- private final ByteBuffer byteBuffer;
-
- private int count;
- private boolean closed;
-
- /**
- * Constructor
- *
- * @param out
- */
- public NIOOutputStream(WritableByteChannel out) {
- this(out, BUFFER_SIZE);
- }
-
- /**
- * Creates a new buffered output stream to write data to the specified underlying output stream with the specified
- * buffer size.
- *
- * @param out the underlying output stream.
- * @param size the buffer size.
- * @throws IllegalArgumentException if size <= 0.
- */
- public NIOOutputStream(WritableByteChannel out, int size) {
- this.out = out;
- if (size <= 0) {
- throw new IllegalArgumentException("Buffer size <= 0");
- }
- buffer = new byte[size];
- byteBuffer = ByteBuffer.wrap(buffer);
- }
-
- /**
- * write a byte on to the stream
- *
- * @param b - byte to write
- * @throws IOException
- */
- public void write(int b) throws IOException {
- checkClosed();
- if (availableBufferToWrite() < 1) {
- flush();
- }
- buffer[count++] = (byte) b;
- }
-
-
- /**
- * write a byte array to the stream
- *
- * @param b the byte buffer
- * @param off the offset into the buffer
- * @param len the length of data to write
- * @throws IOException
- */
- public void write(byte b[], int off, int len) throws IOException {
- checkClosed();
- if (availableBufferToWrite() < len) {
- flush();
- }
- if (buffer.length >= len) {
- System.arraycopy(b, off, buffer, count, len);
- count += len;
- }
- else {
- write( ByteBuffer.wrap(b, off, len));
- }
- }
-
- /**
- * flush the data to the output stream
- * This doesn't call flush on the underlying outputstream, because
- * Tcp is particularly efficent at doing this itself ....
- *
- * @throws IOException
- */
- public void flush() throws IOException {
- if (count > 0 && out != null) {
- byteBuffer.position(0);
- byteBuffer.limit(count);
- write(byteBuffer);
- count = 0;
- }
- }
-
- /**
- * close this stream
- *
- * @throws IOException
- */
- public void close() throws IOException {
- super.close();
- closed = true;
- }
-
-
- /**
- * Checks that the stream has not been closed
- *
- * @throws IOException
- */
- protected void checkClosed() throws IOException {
- if (closed) {
- throw new EOFException("Cannot write to the stream any more it has already been closed");
- }
- }
-
- /**
- * @return the amount free space in the buffer
- */
- private int availableBufferToWrite() {
- return buffer.length - count;
- }
-
- protected void write(ByteBuffer data) throws IOException {
- int remaining = data.remaining();
- int lastRemaining = remaining-1;
- long delay=1;
- while( remaining > 0 ) {
-
- // We may need to do a little bit of sleeping to avoid a busy loop.
- // Slow down if no data was written out..
- if( remaining == lastRemaining ) {
- try {
- // Use exponential rollback to increase sleep time.
- Thread.sleep(delay);
- delay *= 2;
- if( delay > 1000 ) {
- delay = 1000;
- }
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- } else {
- delay = 1;
- }
- lastRemaining = remaining;
-
- // Since the write is non-blocking, all the data may not have been written.
- out.write( data );
- remaining = data.remaining();
- }
- }
-
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * An optimized buffered outputstream for Tcp
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+
+public class NIOOutputStream extends OutputStream {
+
+ private final static int BUFFER_SIZE = 8192;
+
+ private final WritableByteChannel out;
+ private final byte[] buffer;
+ private final ByteBuffer byteBuffer;
+
+ private int count;
+ private boolean closed;
+
+ /**
+ * Constructor
+ *
+ * @param out
+ */
+ public NIOOutputStream(WritableByteChannel out) {
+ this(out, BUFFER_SIZE);
+ }
+
+ /**
+ * Creates a new buffered output stream to write data to the specified underlying output stream with the specified
+ * buffer size.
+ *
+ * @param out the underlying output stream.
+ * @param size the buffer size.
+ * @throws IllegalArgumentException if size <= 0.
+ */
+ public NIOOutputStream(WritableByteChannel out, int size) {
+ this.out = out;
+ if (size <= 0) {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ buffer = new byte[size];
+ byteBuffer = ByteBuffer.wrap(buffer);
+ }
+
+ /**
+ * write a byte on to the stream
+ *
+ * @param b - byte to write
+ * @throws IOException
+ */
+ public void write(int b) throws IOException {
+ checkClosed();
+ if (availableBufferToWrite() < 1) {
+ flush();
+ }
+ buffer[count++] = (byte) b;
+ }
+
+
+ /**
+ * write a byte array to the stream
+ *
+ * @param b the byte buffer
+ * @param off the offset into the buffer
+ * @param len the length of data to write
+ * @throws IOException
+ */
+ public void write(byte b[], int off, int len) throws IOException {
+ checkClosed();
+ if (availableBufferToWrite() < len) {
+ flush();
+ }
+ if (buffer.length >= len) {
+ System.arraycopy(b, off, buffer, count, len);
+ count += len;
+ }
+ else {
+ write( ByteBuffer.wrap(b, off, len));
+ }
+ }
+
+ /**
+ * flush the data to the output stream
+ * This doesn't call flush on the underlying outputstream, because
+ * Tcp is particularly efficent at doing this itself ....
+ *
+ * @throws IOException
+ */
+ public void flush() throws IOException {
+ if (count > 0 && out != null) {
+ byteBuffer.position(0);
+ byteBuffer.limit(count);
+ write(byteBuffer);
+ count = 0;
+ }
+ }
+
+ /**
+ * close this stream
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ super.close();
+ closed = true;
+ }
+
+
+ /**
+ * Checks that the stream has not been closed
+ *
+ * @throws IOException
+ */
+ protected void checkClosed() throws IOException {
+ if (closed) {
+ throw new EOFException("Cannot write to the stream any more it has already been closed");
+ }
+ }
+
+ /**
+ * @return the amount free space in the buffer
+ */
+ private int availableBufferToWrite() {
+ return buffer.length - count;
+ }
+
+ protected void write(ByteBuffer data) throws IOException {
+ int remaining = data.remaining();
+ int lastRemaining = remaining-1;
+ long delay=1;
+ while( remaining > 0 ) {
+
+ // We may need to do a little bit of sleeping to avoid a busy loop.
+ // Slow down if no data was written out..
+ if( remaining == lastRemaining ) {
+ try {
+ // Use exponential rollback to increase sleep time.
+ Thread.sleep(delay);
+ delay *= 2;
+ if( delay > 1000 ) {
+ delay = 1000;
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ } else {
+ delay = 1;
+ }
+ lastRemaining = remaining;
+
+ // Since the write is non-blocking, all the data may not have been written.
+ out.write( data );
+ remaining = data.remaining();
+ }
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Thu Mar 8 06:20:29 2007
@@ -1,156 +1,156 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.Socket;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
-import javax.net.SocketFactory;
-
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
-
-/**
- * An implementation of the {@link Transport} interface using raw tcp/ip
- *
- * @version $Revision$
- */
-public class NIOTransport extends TcpTransport {
-
- //private static final Log log = LogFactory.getLog(NIOTransport.class);
- private SocketChannel channel;
- private SelectorSelection selection;
- private ByteBuffer inputBuffer;
- private ByteBuffer currentBuffer;
- private int nextFrameSize;
-
- public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
- super(wireFormat, socketFactory, remoteLocation, localLocation);
- }
-
- public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
- super(wireFormat, socket);
- }
-
- protected void initializeStreams() throws IOException {
- channel = socket.getChannel();
- channel.configureBlocking(false);
-
- // listen for events telling us when the socket is readable.
- selection = SelectorManager.getInstance().register(channel,
- new SelectorManager.Listener() {
- public void onSelect(SelectorSelection selection) {
- serviceRead();
- }
- public void onError(SelectorSelection selection, Throwable error) {
- if( error instanceof IOException ) {
- onException((IOException) error);
- } else {
- onException(IOExceptionSupport.create(error));
- }
- }
- });
-
- // Send the data via the channel
-// inputBuffer = ByteBuffer.allocateDirect(8*1024);
- inputBuffer = ByteBuffer.allocate(8*1024);
- currentBuffer = inputBuffer;
- nextFrameSize=-1;
- currentBuffer.limit(4);
- this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16*1024));
-
- }
-
- private void serviceRead() {
- try {
- while( true ) {
-
-
- int readSize = channel.read(currentBuffer);
- if( readSize == -1 ) {
- onException(new EOFException());
- selection.close();
- break;
- }
- if( readSize==0 ) {
- break;
- }
-
- if( currentBuffer.hasRemaining() )
- continue;
-
- // Are we trying to figure out the size of the next frame?
- if( nextFrameSize==-1 ) {
- assert inputBuffer == currentBuffer;
-
- // If the frame is too big to fit in our direct byte buffer,
- // Then allocate a non direct byte buffer of the right size for it.
- inputBuffer.flip();
- nextFrameSize = inputBuffer.getInt()+4;
- if( nextFrameSize > inputBuffer.capacity() ) {
- currentBuffer = ByteBuffer.allocate(nextFrameSize);
- currentBuffer.putInt(nextFrameSize);
- } else {
- inputBuffer.limit(nextFrameSize);
- }
-
- } else {
- currentBuffer.flip();
-
- Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
- doConsume((Command) command);
-
- nextFrameSize=-1;
- inputBuffer.clear();
- inputBuffer.limit(4);
- currentBuffer = inputBuffer;
- }
-
- }
-
- } catch (IOException e) {
- onException(e);
- } catch (Throwable e) {
- onException(IOExceptionSupport.create(e));
- }
- }
-
-
- protected void doStart() throws Exception {
- connect();
- selection.setInterestOps(SelectionKey.OP_READ);
- selection.enable();
- }
-
- protected void doStop(ServiceStopper stopper) throws Exception {
- selection.disable();
- super.doStop(stopper);
- }
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+
+/**
+ * An implementation of the {@link Transport} interface using raw tcp/ip
+ *
+ * @version $Revision$
+ */
+public class NIOTransport extends TcpTransport {
+
+ //private static final Log log = LogFactory.getLog(NIOTransport.class);
+ private SocketChannel channel;
+ private SelectorSelection selection;
+ private ByteBuffer inputBuffer;
+ private ByteBuffer currentBuffer;
+ private int nextFrameSize;
+
+ public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+ super(wireFormat, socketFactory, remoteLocation, localLocation);
+ }
+
+ public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
+ super(wireFormat, socket);
+ }
+
+ protected void initializeStreams() throws IOException {
+ channel = socket.getChannel();
+ channel.configureBlocking(false);
+
+ // listen for events telling us when the socket is readable.
+ selection = SelectorManager.getInstance().register(channel,
+ new SelectorManager.Listener() {
+ public void onSelect(SelectorSelection selection) {
+ serviceRead();
+ }
+ public void onError(SelectorSelection selection, Throwable error) {
+ if( error instanceof IOException ) {
+ onException((IOException) error);
+ } else {
+ onException(IOExceptionSupport.create(error));
+ }
+ }
+ });
+
+ // Send the data via the channel
+// inputBuffer = ByteBuffer.allocateDirect(8*1024);
+ inputBuffer = ByteBuffer.allocate(8*1024);
+ currentBuffer = inputBuffer;
+ nextFrameSize=-1;
+ currentBuffer.limit(4);
+ this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16*1024));
+
+ }
+
+ private void serviceRead() {
+ try {
+ while( true ) {
+
+
+ int readSize = channel.read(currentBuffer);
+ if( readSize == -1 ) {
+ onException(new EOFException());
+ selection.close();
+ break;
+ }
+ if( readSize==0 ) {
+ break;
+ }
+
+ if( currentBuffer.hasRemaining() )
+ continue;
+
+ // Are we trying to figure out the size of the next frame?
+ if( nextFrameSize==-1 ) {
+ assert inputBuffer == currentBuffer;
+
+ // If the frame is too big to fit in our direct byte buffer,
+ // Then allocate a non direct byte buffer of the right size for it.
+ inputBuffer.flip();
+ nextFrameSize = inputBuffer.getInt()+4;
+ if( nextFrameSize > inputBuffer.capacity() ) {
+ currentBuffer = ByteBuffer.allocate(nextFrameSize);
+ currentBuffer.putInt(nextFrameSize);
+ } else {
+ inputBuffer.limit(nextFrameSize);
+ }
+
+ } else {
+ currentBuffer.flip();
+
+ Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
+ doConsume((Command) command);
+
+ nextFrameSize=-1;
+ inputBuffer.clear();
+ inputBuffer.limit(4);
+ currentBuffer = inputBuffer;
+ }
+
+ }
+
+ } catch (IOException e) {
+ onException(e);
+ } catch (Throwable e) {
+ onException(IOExceptionSupport.create(e));
+ }
+ }
+
+
+ protected void doStart() throws Exception {
+ connect();
+ selection.setInterestOps(SelectionKey.OP_READ);
+ selection.enable();
+ }
+
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ selection.disable();
+ super.doStop(stopper);
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java Thu Mar 8 06:20:29 2007
@@ -1,110 +1,110 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-import javax.net.ServerSocketFactory;
-import javax.net.SocketFactory;
-
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.transport.tcp.TcpTransportFactory;
-import org.apache.activemq.transport.tcp.TcpTransportServer;
-
-public class NIOTransportFactory extends TcpTransportFactory {
-
- protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
- return new TcpTransportServer(this, location, serverSocketFactory) {
- protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
- return new NIOTransport(format,socket);
- }
- };
- }
-
- protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
- return new NIOTransport(wf, socketFactory, location, localLocation);
- }
-
-
- protected ServerSocketFactory createServerSocketFactory() {
- return new ServerSocketFactory() {
- public ServerSocket createServerSocket(int port) throws IOException {
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.socket().bind(new InetSocketAddress(port));
- return serverSocketChannel.socket();
- }
- public ServerSocket createServerSocket(int port, int backlog) throws IOException {
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog);
- return serverSocketChannel.socket();
- }
- public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog);
- return serverSocketChannel.socket();
- }
- };
- }
-
- protected SocketFactory createSocketFactory() {
- return new SocketFactory() {
-
- public Socket createSocket() throws IOException {
- SocketChannel channel = SocketChannel.open();
- return channel.socket();
- }
-
- public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
- SocketChannel channel = SocketChannel.open();
- channel.connect(new InetSocketAddress(host, port));
- return channel.socket();
- }
-
- public Socket createSocket(InetAddress address, int port) throws IOException {
- SocketChannel channel = SocketChannel.open();
- channel.connect(new InetSocketAddress(address, port));
- return channel.socket();
- }
-
- public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort) throws IOException, UnknownHostException {
- SocketChannel channel = SocketChannel.open();
- channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
- channel.connect(new InetSocketAddress(address, port));
- return channel.socket();
- }
-
- public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int localPort) throws IOException {
- SocketChannel channel = SocketChannel.open();
- channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
- channel.connect(new InetSocketAddress(address, port));
- return channel.socket();
- }
- };
- }
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+
+public class NIOTransportFactory extends TcpTransportFactory {
+
+ protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+ return new TcpTransportServer(this, location, serverSocketFactory) {
+ protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+ return new NIOTransport(format,socket);
+ }
+ };
+ }
+
+ protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+ return new NIOTransport(wf, socketFactory, location, localLocation);
+ }
+
+
+ protected ServerSocketFactory createServerSocketFactory() {
+ return new ServerSocketFactory() {
+ public ServerSocket createServerSocket(int port) throws IOException {
+ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.socket().bind(new InetSocketAddress(port));
+ return serverSocketChannel.socket();
+ }
+ public ServerSocket createServerSocket(int port, int backlog) throws IOException {
+ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog);
+ return serverSocketChannel.socket();
+ }
+ public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
+ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog);
+ return serverSocketChannel.socket();
+ }
+ };
+ }
+
+ protected SocketFactory createSocketFactory() {
+ return new SocketFactory() {
+
+ public Socket createSocket() throws IOException {
+ SocketChannel channel = SocketChannel.open();
+ return channel.socket();
+ }
+
+ public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
+ SocketChannel channel = SocketChannel.open();
+ channel.connect(new InetSocketAddress(host, port));
+ return channel.socket();
+ }
+
+ public Socket createSocket(InetAddress address, int port) throws IOException {
+ SocketChannel channel = SocketChannel.open();
+ channel.connect(new InetSocketAddress(address, port));
+ return channel.socket();
+ }
+
+ public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort) throws IOException, UnknownHostException {
+ SocketChannel channel = SocketChannel.open();
+ channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
+ channel.connect(new InetSocketAddress(address, port));
+ return channel.socket();
+ }
+
+ public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int localPort) throws IOException {
+ SocketChannel channel = SocketChannel.open();
+ channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
+ channel.connect(new InetSocketAddress(address, port));
+ return channel.socket();
+ }
+ };
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java Thu Mar 8 06:20:29 2007
@@ -1,109 +1,109 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-import java.util.LinkedList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-/**
- * The SelectorManager will manage one Selector and the thread that checks the
- * selector.
- *
- * We may need to consider running more than one thread to check the selector if
- * servicing the selector takes too long.
- *
- * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
- */
-final public class SelectorManager {
-
- static final public SelectorManager singleton = new SelectorManager();
- static SelectorManager getInstance() {
- return singleton;
- }
-
- public interface Listener {
- public void onSelect(SelectorSelection selector);
- public void onError(SelectorSelection selection, Throwable error);
- }
-
- private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory(){
- public Thread newThread(Runnable r) {
- Thread rc = new Thread(r);
- rc.setName("NIO Transport Thread");
- return rc;
- }});
- private Executor channelExecutor = selectorExecutor;
- private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
- private int maxChannelsPerWorker = 64;
-
- public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
- throws IOException {
-
- SelectorWorker worker = null;
- if (freeWorkers.size() > 0) {
- worker = freeWorkers.getFirst();
- } else {
- worker = new SelectorWorker(this);
- freeWorkers.addFirst(worker);
- }
-
- SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);
- return selection;
- }
-
- synchronized void onWorkerFullEvent(SelectorWorker worker) {
- freeWorkers.remove(worker);
- }
-
- synchronized public void onWorkerEmptyEvent(SelectorWorker worker) {
- freeWorkers.remove(worker);
- }
-
- synchronized public void onWorkerNotFullEvent(SelectorWorker worker) {
- freeWorkers.add(worker);
- }
-
- public Executor getChannelExecutor() {
- return channelExecutor;
- }
-
- public void setChannelExecutor(Executor channelExecutor) {
- this.channelExecutor = channelExecutor;
- }
-
- public int getMaxChannelsPerWorker() {
- return maxChannelsPerWorker;
- }
-
- public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
- this.maxChannelsPerWorker = maxChannelsPerWorker;
- }
-
- public Executor getSelectorExecutor() {
- return selectorExecutor;
- }
-
- public void setSelectorExecutor(Executor selectorExecutor) {
- this.selectorExecutor = selectorExecutor;
- }
-
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * The SelectorManager will manage one Selector and the thread that checks the
+ * selector.
+ *
+ * We may need to consider running more than one thread to check the selector if
+ * servicing the selector takes too long.
+ *
+ * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
+ */
+final public class SelectorManager {
+
+ static final public SelectorManager singleton = new SelectorManager();
+ static SelectorManager getInstance() {
+ return singleton;
+ }
+
+ public interface Listener {
+ public void onSelect(SelectorSelection selector);
+ public void onError(SelectorSelection selection, Throwable error);
+ }
+
+ private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory(){
+ public Thread newThread(Runnable r) {
+ Thread rc = new Thread(r);
+ rc.setName("NIO Transport Thread");
+ return rc;
+ }});
+ private Executor channelExecutor = selectorExecutor;
+ private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
+ private int maxChannelsPerWorker = 64;
+
+ public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
+ throws IOException {
+
+ SelectorWorker worker = null;
+ if (freeWorkers.size() > 0) {
+ worker = freeWorkers.getFirst();
+ } else {
+ worker = new SelectorWorker(this);
+ freeWorkers.addFirst(worker);
+ }
+
+ SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);
+ return selection;
+ }
+
+ synchronized void onWorkerFullEvent(SelectorWorker worker) {
+ freeWorkers.remove(worker);
+ }
+
+ synchronized public void onWorkerEmptyEvent(SelectorWorker worker) {
+ freeWorkers.remove(worker);
+ }
+
+ synchronized public void onWorkerNotFullEvent(SelectorWorker worker) {
+ freeWorkers.add(worker);
+ }
+
+ public Executor getChannelExecutor() {
+ return channelExecutor;
+ }
+
+ public void setChannelExecutor(Executor channelExecutor) {
+ this.channelExecutor = channelExecutor;
+ }
+
+ public int getMaxChannelsPerWorker() {
+ return maxChannelsPerWorker;
+ }
+
+ public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
+ this.maxChannelsPerWorker = maxChannelsPerWorker;
+ }
+
+ public Executor getSelectorExecutor() {
+ return selectorExecutor;
+ }
+
+ public void setSelectorExecutor(Executor selectorExecutor) {
+ this.selectorExecutor = selectorExecutor;
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java Thu Mar 8 06:20:29 2007
@@ -1,72 +1,72 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
-import org.apache.activemq.transport.nio.SelectorManager.Listener;
-
-/**
- *
- * @author chirino
- */
-final public class SelectorSelection {
-
- private final SelectorWorker worker;
- private final SelectionKey key;
- private final Listener listener;
- private int interest;
-
-
- public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
- this.worker = worker;
- this.listener = listener;
- this.key = socketChannel.register(worker.selector, 0, this);
- worker.incrementUseCounter();
- }
-
- public void setInterestOps(int ops) {
- interest = ops;
- }
-
- public void enable() {
- key.interestOps(interest);
- worker.selector.wakeup();
- }
-
- public void disable() {
- key.interestOps(0);
- }
-
- public void close() {
- worker.decrementUseCounter();
- key.cancel();
- worker.selector.wakeup();
- }
-
- public void onSelect() {
- listener.onSelect(this);
- }
-
- public void onError(Throwable e) {
- listener.onError(this, e);
- }
-
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.activemq.transport.nio.SelectorManager.Listener;
+
+/**
+ *
+ * @author chirino
+ */
+final public class SelectorSelection {
+
+ private final SelectorWorker worker;
+ private final SelectionKey key;
+ private final Listener listener;
+ private int interest;
+
+
+ public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
+ this.worker = worker;
+ this.listener = listener;
+ this.key = socketChannel.register(worker.selector, 0, this);
+ worker.incrementUseCounter();
+ }
+
+ public void setInterestOps(int ops) {
+ interest = ops;
+ }
+
+ public void enable() {
+ key.interestOps(interest);
+ worker.selector.wakeup();
+ }
+
+ public void disable() {
+ key.interestOps(0);
+ }
+
+ public void close() {
+ worker.decrementUseCounter();
+ key.cancel();
+ worker.selector.wakeup();
+ }
+
+ public void onSelect() {
+ listener.onSelect(this);
+ }
+
+ public void onError(Throwable e) {
+ listener.onError(this, e);
+ }
+
}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java Thu Mar 8 06:20:29 2007
@@ -1,130 +1,130 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.nio;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-public class SelectorWorker implements Runnable {
-
- private final static AtomicInteger NEXT_ID = new AtomicInteger();
-
- final SelectorManager manager;
- final Selector selector;
- final int id = NEXT_ID.getAndIncrement();
- final AtomicInteger useCounter = new AtomicInteger();
- final private int maxChannelsPerWorker;
-
-
- public SelectorWorker(SelectorManager manager) throws IOException {
- this.manager = manager;
- selector = Selector.open();
- maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
- }
-
- void incrementUseCounter() {
- int use = useCounter.getAndIncrement();
- if( use == 0 ) {
- manager.getSelectorExecutor().execute(this);
- } else if( use+1 == maxChannelsPerWorker ) {
- manager.onWorkerFullEvent(this);
- }
- }
-
- void decrementUseCounter() {
- int use = useCounter.getAndDecrement();
- if (use == 1) {
- manager.onWorkerEmptyEvent(this);
- } else if (use == maxChannelsPerWorker ) {
- manager.onWorkerNotFullEvent(this);
- }
- }
-
- boolean isRunning() {
- return useCounter.get()!=0;
- }
-
- public void run() {
-
- String origName = Thread.currentThread().getName();
- try {
- Thread.currentThread().setName("Selector Worker: " + id);
- while (isRunning()) {
-
- int count = selector.select(10);
- if (count == 0)
- continue;
-
- if (!isRunning())
- return;
-
- // Get a java.util.Set containing the SelectionKey objects
- // for all channels that are ready for I/O.
- Set keys = selector.selectedKeys();
-
- for (Iterator i = keys.iterator(); i.hasNext();) {
- final SelectionKey key = (SelectionKey) i.next();
- i.remove();
-
- final SelectorSelection s = (SelectorSelection) key.attachment();
- try {
- s.disable();
-
- // Kick off another thread to find newly selected keys while we process the
- // currently selected keys
- manager.getChannelExecutor().execute(new Runnable() {
- public void run() {
- try {
- s.onSelect();
- s.enable();
- } catch (Throwable e) {
- s.onError(e);
- }
- }
- });
-
- } catch ( Throwable e ) {
- s.onError(e);
- }
-
- }
-
- }
- } catch (IOException e) {
-
- // Don't accept any more slections
- manager.onWorkerEmptyEvent(this);
-
- // Notify all the selections that the error occurred.
- Set keys = selector.keys();
- for (Iterator i = keys.iterator(); i.hasNext();) {
- SelectionKey key = (SelectionKey) i.next();
- SelectorSelection s = (SelectorSelection) key.attachment();
- s.onError(e);
- }
-
- } finally {
- Thread.currentThread().setName(origName);
- }
- }
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class SelectorWorker implements Runnable {
+
+ private final static AtomicInteger NEXT_ID = new AtomicInteger();
+
+ final SelectorManager manager;
+ final Selector selector;
+ final int id = NEXT_ID.getAndIncrement();
+ final AtomicInteger useCounter = new AtomicInteger();
+ final private int maxChannelsPerWorker;
+
+
+ public SelectorWorker(SelectorManager manager) throws IOException {
+ this.manager = manager;
+ selector = Selector.open();
+ maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
+ }
+
+ void incrementUseCounter() {
+ int use = useCounter.getAndIncrement();
+ if( use == 0 ) {
+ manager.getSelectorExecutor().execute(this);
+ } else if( use+1 == maxChannelsPerWorker ) {
+ manager.onWorkerFullEvent(this);
+ }
+ }
+
+ void decrementUseCounter() {
+ int use = useCounter.getAndDecrement();
+ if (use == 1) {
+ manager.onWorkerEmptyEvent(this);
+ } else if (use == maxChannelsPerWorker ) {
+ manager.onWorkerNotFullEvent(this);
+ }
+ }
+
+ boolean isRunning() {
+ return useCounter.get()!=0;
+ }
+
+ public void run() {
+
+ String origName = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName("Selector Worker: " + id);
+ while (isRunning()) {
+
+ int count = selector.select(10);
+ if (count == 0)
+ continue;
+
+ if (!isRunning())
+ return;
+
+ // Get a java.util.Set containing the SelectionKey objects
+ // for all channels that are ready for I/O.
+ Set keys = selector.selectedKeys();
+
+ for (Iterator i = keys.iterator(); i.hasNext();) {
+ final SelectionKey key = (SelectionKey) i.next();
+ i.remove();
+
+ final SelectorSelection s = (SelectorSelection) key.attachment();
+ try {
+ s.disable();
+
+ // Kick off another thread to find newly selected keys while we process the
+ // currently selected keys
+ manager.getChannelExecutor().execute(new Runnable() {
+ public void run() {
+ try {
+ s.onSelect();
+ s.enable();
+ } catch (Throwable e) {
+ s.onError(e);
+ }
+ }
+ });
+
+ } catch ( Throwable e ) {
+ s.onError(e);
+ }
+
+ }
+
+ }
+ } catch (IOException e) {
+
+ // Don't accept any more slections
+ manager.onWorkerEmptyEvent(this);
+
+ // Notify all the selections that the error occurred.
+ Set keys = selector.keys();
+ for (Iterator i = keys.iterator(); i.hasNext();) {
+ SelectionKey key = (SelectionKey) i.next();
+ SelectorSelection s = (SelectorSelection) key.attachment();
+ s.onError(e);
+ }
+
+ } finally {
+ Thread.currentThread().setName(origName);
+ }
+ }
}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TimeStampTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
------------------------------------------------------------------------------
svn:eol-style = native