You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/21 21:44:43 UTC

svn commit: r719706 [4/6] - in /activemq/activemq-blaze: ./ branches/ tags/ trunk/ trunk/src/ trunk/src/main/ trunk/src/main/java/ trunk/src/main/java/org/ trunk/src/main/java/org/apache/ trunk/src/main/java/org/apache/activeblaze/ trunk/src/main/java/...

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.wire.DestinationData;
+import org.apache.activeblaze.wire.MemberData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Maintains members of a group
+ * 
+ */
+public class Group extends BaseService {
+    static final Log LOG = LogFactory.getLog(Group.class);
+    final BlazeGroupChannelImpl channel;
+    private final BlazeGroupConfiguration configuration;
+    private ScheduledExecutorService heartBeatService;
+    private ScheduledExecutorService checkMembershipService;
+    protected Map<String, MemberImpl> members = new ConcurrentHashMap<String, MemberImpl>();
+    private List<MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
+    private final Map<Buffer, List<MemberImpl>> queueMap = new ConcurrentHashMap<Buffer, List<MemberImpl>>();
+    private final Map<Buffer, List<MemberImpl>> topicMap = new ConcurrentHashMap<Buffer, List<MemberImpl>>();
+
+    /**
+     * Constructor
+     * 
+     * @param local
+     * @param channel
+     * @param transport
+     * @param config
+     */
+    protected Group(BlazeGroupChannelImpl channel) {
+        this.channel = channel;
+        this.configuration = channel.getGroupConfiguration();
+    }
+
+    /**
+     * @return the Member of the Channel
+     * @throws Exception
+     */
+    public MemberImpl getLocalMember() throws Exception {
+        return this.channel.getLocalMember();
+    }
+    
+    void updateLocal(MemberImpl member) {
+        this.members.put(member.getId(), member);
+    }
+
+    /**
+     * @return the id of the local channel associated with this group
+     */
+    public String getId() {
+        return this.channel.getId();
+    }
+    
+    /**
+     * @return the name of the local channel
+     */
+    public String getName() {
+        return this.channel.getName();
+    }
+
+    /**
+     * @return the configuration associated with this channel
+     */
+    public BlazeGroupConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    void addMemberChangedListener(MemberChangedListener l) {
+        this.membershipListeners.add(l);
+    }
+
+    void removeMemberChangedListener(MemberChangedListener l) {
+        this.membershipListeners.add(l);
+    }
+
+    /**
+     * @return the members
+     */
+    Set<Member> getMembers() {
+        return new HashSet<Member>(this.members.values());
+    }
+
+    /**
+     * @return the members from this group
+     */
+    public Set<MemberImpl> getMembersImpl() {
+        return new HashSet<MemberImpl>(this.members.values());
+    }
+
+    /**
+     * Get a member by its unique id
+     * 
+     * @param id
+     * @return
+     */
+    Member getMemberById(String id) {
+        return this.members.get(id);
+    }
+
+    /**
+     * Return a member of the Group with the matching name
+     * 
+     * @param name
+     * @return
+     */
+    Member getMemberByName(String name) {
+        if (name != null) {
+            for (Member member : this.members.values()) {
+                if (member.getName().equals(name)) {
+                    return member;
+                }
+            }
+        }
+        return null;
+    }
+    
+    /**
+     * Will wait for a member to advertise itself if not available
+     * @param name
+     * @param timeout
+     * @return the member or null
+     * @throws InterruptedException 
+     */
+    public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException {
+        Member result = null;
+        long deadline = 0;
+        if (timeout > 0) {
+            deadline = System.currentTimeMillis() + timeout;
+        }
+        while (true) {
+            result = getMemberByName(name);
+            if (result == null) {
+                synchronized(this.members) {
+                    this.members.wait(timeout);
+                }
+                if (timeout > 0) {
+                    timeout = (int) Math.max(deadline - System.currentTimeMillis(), 0l);
+                }
+            }else {
+                break;
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#init()
+     */
+    public boolean init() throws Exception {
+        boolean result = super.init();
+        if (result) {
+            this.members.put(this.channel.getId(), this.channel.getLocalMember());
+        }
+        return result;
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#shutDown()
+     */
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        if (result) {
+        }
+        return result;
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#start()
+     */
+    public boolean start() throws Exception {
+        boolean result = super.start();
+        if (result) {
+            this.heartBeatService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+                public Thread newThread(Runnable r) {
+                    Thread thread = new Thread(r);
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
+            Runnable heartbeat = new Runnable() {
+                public void run() {
+                    try {
+                        broadcastHeartBeat(getLocalMember());
+                    } catch (Exception e) {
+                        LOG.error("Failed to send heartbeat",e);
+                    }
+                }
+            };
+            heartbeat.run();
+            int interval = this.configuration.getHeartBeatInterval();
+            this.heartBeatService.scheduleAtFixedRate(heartbeat, interval, interval, TimeUnit.MILLISECONDS);
+            this.checkMembershipService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+                public Thread newThread(Runnable r) {
+                    Thread thread = new Thread(r);
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
+            Runnable checkMembership = new Runnable() {
+                public void run() {
+                    if (isStarted()) {
+                        try {
+                            checkMembership();
+                        } catch (Exception e) {
+                            LOG.error("Failed to checkMembership", e);
+                        }
+                    }
+                }
+            };
+            this.checkMembershipService.scheduleAtFixedRate(checkMembership, interval / 3, interval / 2,
+                    TimeUnit.MILLISECONDS);
+        }
+        return result;
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#stop()
+     */
+    public boolean stop() throws Exception {
+        boolean result = super.stop();
+        if (result) {
+            if (this.heartBeatService != null) {
+                this.heartBeatService.shutdown();
+            }
+            if (this.checkMembershipService != null) {
+                this.checkMembershipService.shutdown();
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Process a new member
+     * 
+     * @param data
+     * @throws Exception
+     * @return Member if a new member else null
+     */
+    protected MemberImpl processMember(MemberData data) throws Exception {
+        MemberImpl result = null;
+        MemberImpl old = null;
+        MemberImpl member = new MemberImpl(data);
+        if (!member.getId().equals(getLocalMember().getId())) {
+            member.setTimeStamp(System.currentTimeMillis());
+            if ((old = this.members.put(member.getId(), member)) == null) {
+                processMemberStarted(member);
+                if (!member.getId().equals(this.channel.getId())) {
+                    this.channel.sendMessage(member.getAddress(), MessageType.MEMBER_DATA, this.channel
+                            .getLocalMember().getData());
+                }
+                result = member;
+            } else {
+                if (data.getDestinationsChanged()) {
+                    processMemberUpdate(old, member);
+                }
+            }
+        }
+        return result;
+    }
+
+    private void fireMemberStarted(Member member) {
+        LOG.debug(this.channel.getName() + " Member started " + member);
+        for (MemberChangedListener l : this.membershipListeners) {
+            l.memberStarted(member);
+        }
+    }
+
+    private void fireMemberStopped(Member member) {
+        LOG.debug(this.channel.getName() + " Member stopped " + member);
+        for (MemberChangedListener l : this.membershipListeners) {
+            l.memberStopped(member);
+        }
+    }
+
+    void checkMembership() throws Exception {
+        if (isStarted()) {
+            long checkTime = System.currentTimeMillis() - this.configuration.getHeartBeatInterval();
+            for (MemberImpl member : this.members.values()) {
+                if (!member.getId().equals(getId()) && member.getTimeStamp() < checkTime) {
+                    LOG.debug(getId() +" Member timestamp expired " + member);
+                    this.members.remove(member.getId());
+                    processMemberStopped(member);
+                }
+            }
+        }
+    }
+
+    protected void processMemberStarted(MemberImpl member) throws Exception {
+        processDestinationsForStarted(member);
+        fireMemberStarted(member);
+        synchronized(this.members) {
+            this.members.notifyAll();
+        }
+    }
+
+    protected void processMemberStopped(MemberImpl member) throws Exception {
+        fireMemberStopped(member);
+        processDestinationsForStopped(member);
+    }
+
+    private void processDestinationsForStarted(MemberImpl member) {
+        List<DestinationData> destList = member.getData().getDestinationList();
+        for (DestinationData dest : destList) {
+            Buffer key = dest.getDestination();
+            Map<Buffer, List<MemberImpl>> map = null;
+            if (dest.getTopic()) {
+                map = this.topicMap;
+            } else {
+                map = this.queueMap;
+            }
+            List<MemberImpl> members = map.get(key);
+            if (members == null) {
+                members = new CopyOnWriteArrayList<MemberImpl>();
+                map.put(key, members);
+            }
+            members.add(member);
+        }
+    }
+
+    private void processDestinationsForStopped(MemberImpl member) {
+        List<DestinationData> destList = member.getData().getDestinationList();
+        for (DestinationData dest : destList) {
+            Buffer key = dest.getDestination();
+            Map<Buffer, List<MemberImpl>> map = null;
+            if (dest.getTopic()) {
+                map = this.topicMap;
+            } else {
+                map = this.queueMap;
+            }
+            List<MemberImpl> members = map.get(key);
+            if (members != null) {
+                members.remove(member);
+                if (members.isEmpty()) {
+                    map.remove(key);
+                }
+            }
+        }
+    }
+
+    protected void processMemberUpdate(MemberImpl oldMember, MemberImpl newMember) throws Exception {
+        processDestinationsForStopped(oldMember);
+        processDestinationsForStarted(newMember);
+    }
+
+    /**
+     * @return the queueMap
+     */
+    protected Map<Buffer, List<MemberImpl>> getQueueMap() {
+        return this.queueMap;
+    }
+
+    /**
+     * @return the topicMap
+     */
+    protected Map<Buffer, List<MemberImpl>> getTopicMap() {
+        return this.topicMap;
+    }
+    
+    protected void broadcastHeartBeat(MemberImpl local) throws Exception {
+        if (isStarted()) {
+            Group.this.channel.broadcastMessage(MessageType.MEMBER_DATA, local.getData());
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+
+/**
+ *A <CODE>Member</CODE> holds information about a member of the group
+ *
+ */
+public interface Member {
+    
+    /**
+     * @return the name
+     */
+    public String getName();
+
+    /**
+     * @return the id
+     */
+    public String getId();
+    
+        
+    /**
+     * @return the startTime
+     */
+    public long getStartTime();
+    
+    /**
+     * @return the timeStamp
+     */
+    long getTimeStamp();
+    
+    /**
+     * Set the timestamp
+     * @param value
+     */
+    void setTimeStamp(long value);
+    /**
+     * @return the inbox destination
+     */
+    public String getInBoxDestination();
+    
+    
+    /**
+     * @return the coordinatorWeight
+     */
+    public long getCoordinatorWeight();
+    
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activeblaze.group;
+
+
+/**
+ * A listener for membership changes to a group
+ *
+ */
+public interface MemberChangedListener {
+    
+    /**
+     * Notification a member has started
+     * @param member
+     */
+    void memberStarted(Member member);
+    
+    /**
+     * Notification a member has stopped
+     * @param member
+     */
+    void memberStopped(Member member);
+}
\ No newline at end of file

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.apache.activeblaze.wire.MemberData;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * Implementation of a Member
+ *
+ */
+public class MemberImpl implements Member {
+    private final MemberData data;
+    private final InetSocketAddress socketAddress;
+    private final Buffer socketAddressAsBuffer;
+    
+
+    /**
+     * Default constructor
+     * @param id 
+     * @param name 
+     * @param coordinatorWeight 
+     * @param localURI 
+     * @throws Exception 
+     */
+    public MemberImpl(String id,String name,long coordinatorWeight,URI localURI) throws Exception {
+        InetAddress addr = InetAddress.getByName(localURI.getHost());
+        this.socketAddress = new InetSocketAddress(addr,localURI.getPort());
+        this.socketAddressAsBuffer=new Buffer(this.socketAddress.toString());
+        this.data = new MemberData();
+        this.data.setId(id);
+        this.data.setName(name);
+        this.data.setCoordinatorWeight(coordinatorWeight);
+        this.data.setStartTime(System.currentTimeMillis());
+        this.data.setInetAddress(new Buffer(addr.getHostAddress()));
+        this.data.setPort(localURI.getPort());
+        
+        
+    }
+    /**
+     * Constructor
+     * @param data 
+     * @throws Exception 
+     */
+    public MemberImpl(MemberData data) throws Exception {
+        this.data = data;
+        InetAddress addr = InetAddress.getByName(data.getInetAddress().toStringUtf8());
+        this.socketAddress= new InetSocketAddress(addr,data.getPort());
+        this.socketAddressAsBuffer=new Buffer(this.socketAddress.toString());
+    }
+
+    /**
+     * @return the name
+     */
+    public String getName() {
+        return this.data.getName();
+    }
+
+    /**
+     * @return the id
+     */
+    public String getId() {
+        return this.data.getId();
+    }
+    
+    void setId(String id) {
+        this.data.setId(id);
+    }
+    
+        
+    /**
+     * @return the startTime
+     */
+    public long getStartTime() {
+        return this.data.getStartTime();
+    }
+    
+    
+    /**
+     * @return the inbox destination
+     */
+    public String getInBoxDestination() {
+        return this.data.getId();
+    }
+    
+    /**
+     * @return the SocketAddress for this member
+     */
+    public InetSocketAddress getAddress () {
+        return this.socketAddress;
+        
+    }
+    
+    /**
+     * @return address as a Buffer
+     */
+    public Buffer getAddressAsBuffer() {
+        return this.socketAddressAsBuffer;
+    }
+    
+     /**
+     * @return the timeStamp
+     */
+    public long getTimeStamp() {
+        return this.data.getTimeStamp();
+    }
+    
+    /**
+     * Set the timestamp
+     * @param value
+     */
+    public void setTimeStamp(long value) {
+        this.data.setTimeStamp(value);
+    }
+
+    
+    /**
+     * @return the coordinatorWeight
+     */
+    public long getCoordinatorWeight() {
+        return this.data.getCoordinatorWeight();
+    }
+       
+    
+    public String toString() {
+        return this.data.getName()+"["+this.data.getId()+"]";
+    }
+    
+        
+    public int hashCode() {
+        return this.data.getId().hashCode();
+    }
+    
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof MemberImpl) {
+            MemberImpl other = (MemberImpl)obj;
+            result = this.data.getId().equals(other.data.getId());
+        }
+        return result;
+    }
+    
+    /**
+     * @return the data
+     */
+    public MemberData getData() {
+        return this.data;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * request callback
+ *
+ */
+public interface RequestCallback {
+    /**
+     * Optionally called when a request is finished
+     * @param id
+     */
+    void finished(Buffer id);
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author state on a request
+ *
+ */
+class SendRequest {
+    private static final Log LOG = LogFactory.getLog(SendRequest.class);
+    private final AtomicBoolean done = new AtomicBoolean();
+    private Message<?> response;
+    private RequestCallback callback;
+
+    Object get(long timeout) {
+        synchronized (this.done) {
+            if (this.done.get() == false && this.response == null) {
+                try {
+                    this.done.wait(timeout);
+                } catch (InterruptedException e) {
+                    LOG.warn("Interrupted in  get("+timeout+")",e);
+                }
+            }
+        }
+        return this.response;
+    }
+
+    void put(Buffer id,Message<?> response) {
+        this.response = response;
+        cancel();
+        RequestCallback callback = this.callback;
+        if (callback != null) {
+            callback.finished(id);
+        }
+    }
+
+    void cancel() {
+        this.done.set(true);
+        synchronized (this.done) {
+            this.done.notifyAll();
+        }
+    }
+    
+     void setCallback(RequestCallback callback) {
+        this.callback=callback;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html Fri Nov 21 12:44:40 2008
@@ -0,0 +1,26 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Group channel for communicating true point-to-point using unicast and 
+Group membership
+
+</body>
+</html>
\ No newline at end of file

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.destination;
+
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * Matches a Destination Subject to wildcards
+ * 
+ */
+public final class DestinationMatch {
+    static final byte MATCH_ELEMENT = '*';
+    static final byte MATCH_ALL = '>';
+    static final byte[] DELIMETERS = { '.', '/', '|' };
+
+    /**
+     * See if the destination matches with wild cards
+     * 
+     * @param destination
+     * @param match
+     * @return true if its a match
+     */
+    public static boolean isMatch(String destination, String match) {
+        return isMatch(new Buffer(destination), new Buffer(match));
+    }
+    /**
+     * See if the destination matches with wild cards
+     * 
+     * @param destination
+     * @param match
+     * @return true if its a match
+     */
+    public static boolean isMatch(Buffer destination, String match) {
+        return isMatch(destination, new Buffer(match));
+    }
+    
+    /**
+     * See if the destination matches with wild cards
+     * 
+     * @param destination
+     * @param match
+     * @return true if its a match
+     */
+    public static boolean isMatch(String destination, Buffer match) {
+        return isMatch(new Buffer(destination), match);
+    }
+
+    /**
+     * See if the destination matches with wild cards
+     * 
+     * @param destination
+     * @param match
+     * @return
+     */
+    public static boolean isMatch(Buffer destination, Buffer match) {
+        boolean result = true;
+        boolean matchAll = false;
+        if (destination == null && match == null) {
+            return true;
+        }
+        if (destination == null || match == null) {
+            return false;
+        }
+        int destinationOffset = 0;
+        int matchOffset = 0;
+        while (destinationOffset < destination.length
+                && matchOffset < match.length) {
+            byte matchByte = match.byteAt(matchOffset);
+            byte destinationByte = destination.byteAt(destinationOffset);
+            if (matchByte != destinationByte || matchByte == MATCH_ALL || destinationByte == MATCH_ALL
+                    || matchByte == MATCH_ELEMENT || destinationByte == MATCH_ELEMENT) {
+                if (matchByte == MATCH_ALL || destinationByte == MATCH_ALL) {
+                    matchAll = true;
+                    break;
+                } else if ((matchByte == MATCH_ELEMENT || destinationByte == MATCH_ELEMENT)
+                        && (isMatchElement(match, matchOffset, match.length) || isMatchElement(destination,
+                                destinationOffset, destination.length))) {
+                    if (containsDelimeter(destination, destinationOffset, destination.length) == false
+                            && containsDelimeter(match, matchOffset, match.length) == false) {
+                        break;
+                    } else {
+                        matchOffset = offsetToNextToken(match, matchOffset, match.length);
+                        destinationOffset = offsetToNextToken(destination, destinationOffset, destination.length);
+                        continue;
+                    }
+                } else {
+                    result = false;
+                    break;
+                }
+            }
+            matchOffset++;
+            destinationOffset++;
+        }
+        if (result
+                && (match.length != destination.length && matchOffset != match.length || destinationOffset != destination.length)
+                && !matchAll) {
+            result = isMatchAll(match, matchOffset, match.length)
+                    || isMatchAll(destination, destinationOffset, destination.length);
+        }
+        return result;
+    }
+
+    static boolean isMatchAll(Buffer str, int offset, int count) {
+        boolean result = false;
+        if (str != null && offset < str.length) {
+            byte offByte = str.byteAt(offset);
+            byte offBytePlusOne = (byte) ((offset + 1) < str.length ? str.byteAt(offset + 1) : ' ');
+            if (offset + 1 < str.length && isDelimiter(offByte)) {
+                if (offBytePlusOne == MATCH_ALL) {
+                    result = true;
+                }
+            } else if ((offByte == MATCH_ALL || offByte == MATCH_ELEMENT)
+                    && (isWhiteSpace(str, offset + 1, count) || isDelimiter(offBytePlusOne) || offset + 1 == str.length)) {
+                result = true;
+            }
+        }
+        return result;
+    }
+
+    static boolean isMatchElement(Buffer str, int offset, int count) {
+        boolean result = false;
+        if (str.byteAt(offset) == MATCH_ELEMENT) {
+            if (offset == 0 || isDelimiter(str.byteAt(offset - 1))) {
+                result = ((offset + 1) >= str.length) || isDelimiter(str.byteAt(offset + 1))
+                        || isWhiteSpace(str, offset + 1, count);
+            }
+        }
+        return result;
+    }
+
+    private static boolean isWhiteSpace(Buffer str, int offset, int len) {
+        boolean result = true;
+        while ((offset < len)) {
+            if (str.byteAt(offset++) > ' ') {
+                result = false;
+                break;
+            }
+        }
+        return result;
+    }
+
+    static int offsetToNextToken(Buffer str, int offset, int len) {
+        while (offset < len) {
+            if (isDelimiter(str.byteAt(offset)))
+                break;
+            offset++;
+        }
+        return offset;
+    }
+
+    static int offsetToNextElement(Buffer str, int offset, int len) {
+        int result = -1;
+        int count = offset;
+        while (count < len) {
+            if (isDelimiter(str.byteAt(count))) {
+                result = ++count;
+                // check for double placed elements ...
+                while (count < len && isDelimiter(str.byteAt(count))) {
+                    result = ++count;
+                }
+                break;
+            }
+            count++;
+        }
+        return result;
+    }
+
+    private static boolean containsDelimeter(Buffer str, int offset, int len) {
+        boolean result = false;
+        while (offset < len) {
+            if (isDelimiter(str.byteAt(offset++))) {
+                result = true;
+                break;
+            }
+        }
+        return result;
+    }
+
+    private static final boolean isDelimiter(byte b) {
+        for (int i = 0; i < DELIMETERS.length; i++) {
+            if (b == DELIMETERS[i]) {
+                return true;
+            }
+        }
+        return false;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html Fri Nov 21 12:44:40 2008
@@ -0,0 +1,25 @@
+!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+destination utility classes
+
+</body>
+</html>
\ No newline at end of file

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.ExceptionListener;
+import org.apache.activeblaze.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Chains Processors together
+ * 
+ */
+public class ChainedProcessor extends BaseService implements Processor {
+    private static final Log LOG = LogFactory.getLog(ChainedProcessor.class);
+    private Processor next;
+    private Processor prev;
+    protected ExceptionListener exceptionListener;
+
+    protected ChainedProcessor() {
+    }
+
+    /**
+     * @return the next
+     */
+    public Processor getNext() {
+        return this.next;
+    }
+
+    /**
+     * Set Next at the end of the chain
+     * @param next
+     *            
+     */
+    public void setEnd(Processor next) {
+        ChainedProcessor target = this;
+        Processor n = getNext();
+        while (n != null) {
+            if (n instanceof ChainedProcessor) {
+                ChainedProcessor cn = (ChainedProcessor) n;
+                target = cn;
+                n = cn.getNext();
+            }
+        }
+        if(next instanceof ChainedProcessor) {
+            target.setNextChain((ChainedProcessor) next);
+        }else {
+        target.next=next;
+        }
+    }
+    
+    /**
+     * Set the next
+     * @param next
+     */
+    public void setNext(Processor next) {
+        this.next=next;
+    }
+
+    /**
+     * @return the prev
+     */
+    public Processor getPrev() {
+        return this.prev;
+    }
+
+    /**
+     * Set the next chain
+     * 
+     * @param p
+     */
+    public void setNextChain(ChainedProcessor p) {
+        ChainedProcessor target = this;
+        Processor n = getNext();
+        while (n != null) {
+            if (n instanceof ChainedProcessor) {
+                ChainedProcessor cn = (ChainedProcessor) n;
+                target = cn;
+                n = cn.getNext();
+            }
+        }
+        target.next=p;
+        p.setPrev(target);
+        if (this.exceptionListener != null && p.exceptionListener == null) {
+            p.exceptionListener = this.exceptionListener;
+        }
+    }
+
+    /**
+     * @param prev
+     *            the prev to set
+     */
+    public void setPrev(Processor prev) {
+        this.prev = prev;
+    }
+
+    public boolean init() throws Exception {
+        boolean result = super.init();
+        if (result && this.next != null) {
+            result = this.next.init();
+        }
+        return result;
+    }
+
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        if (result && this.next != null) {
+            result = this.next.shutDown();
+        }
+        return result;
+    }
+
+    public boolean start() throws Exception {
+        boolean result = super.start();
+        if (result && this.next != null) {
+            result = this.next.start();
+        }
+        return result;
+    }
+
+    public boolean stop() throws Exception {
+        boolean result = super.stop();
+        if (result && this.next != null) {
+            result = this.next.stop();
+        }
+        return result;
+    }
+
+    public void downStream(Packet packet) throws Exception {
+        if (this.next != null) {
+            this.next.downStream(packet);
+        }
+    }
+
+    public void upStream(Packet packet) throws Exception {
+        if (this.prev != null) {
+            this.prev.upStream(packet);
+        }
+    }
+
+    public void setExceptionListener(ExceptionListener l) {
+        this.exceptionListener = l;
+    }
+
+    protected void fireException(String reason, Exception e) {
+        doFireException(new BlazeException(reason, e));
+    }
+
+    protected void fireException(Exception e) {
+        doFireException(new BlazeException(e));
+    }
+
+    protected void fireException(String reason) {
+        doFireException(new BlazeException(reason));
+    }
+
+    protected void doFireException(Exception e) {
+        ExceptionListener l = this.exceptionListener;
+        if (l != null) {
+            l.onException(e);
+        } else {
+            LOG.error("No exception listener - caught exception ", e);
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * Compresses PacketData
+ * 
+ */
+public class CompressionProcessor extends ChainedProcessor {
+    private int compressionLimit = 8192;
+    private int compressionLevel = Deflater.BEST_COMPRESSION;
+    private class CompressionStream extends GZIPOutputStream {
+        CompressionStream(OutputStream out, int size) throws IOException {
+            super(out, size);
+            this.def.setLevel(getCompressionLevel());
+        }
+    }
+
+    /**
+     * @return the limit beyond which data will be compresses
+     */
+    public int getCompressionLimit() {
+        return this.compressionLimit;
+    }
+
+    /**
+     * @param compressionLimit -
+     *            set the compressionLimit
+     */
+    public void setCompressionLimit(int compressionLimit) {
+        this.compressionLimit = compressionLimit;
+    }
+
+    public void downStream(Packet packet) throws Exception {
+        Buffer data = packet.getPacketData().getPayload();
+        if (data != null && data.length >= this.compressionLimit) {
+            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(data.length);
+            try {
+                GZIPOutputStream gzipOut = new CompressionStream(bytesOut, data.length);
+                gzipOut.write(data.toByteArray());
+                gzipOut.close();
+                bytesOut.close();
+                byte[] result = bytesOut.toByteArray();
+                packet.getPacketData().clearPayload();
+                // need to clone to get sizing correct
+                packet = packet.clone();
+                packet.getPacketData().setPayload(new Buffer(result));
+            } catch (IOException e) {
+                fireException("Failed to deflate packet", e);
+            }
+        }
+        super.downStream(packet);
+    }
+
+    public void upStream(Packet packet) throws Exception {
+        Buffer data = packet.getPacketData().getPayload();
+        ;
+        if (CompressionProcessor.isCompressed(data)) {
+            InputStream bytesIn = data.newInput();
+            try {
+                GZIPInputStream gzipIn = new GZIPInputStream(bytesIn);
+                ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+                byte[] buffer = new byte[4096];
+                int bytesRead = 0;
+                while ((bytesRead = gzipIn.read(buffer, 0, buffer.length)) > 0) {
+                    bytesOut.write(buffer, 0, bytesRead);
+                }
+                gzipIn.close();
+                bytesIn.close();
+                byte[] result = bytesOut.toByteArray();
+                bytesOut.close();
+                packet.getPacketData().clearPayload();
+                // need to clone to get sizing correct
+                packet = packet.clone();
+                packet.getPacketData().setPayload(new Buffer(result));
+            } catch (IOException e) {
+                fireException("Failed to inflate packet", e);
+            }
+        }
+        super.upStream(packet);
+    }
+
+    static boolean isCompressed(Buffer data) {
+        boolean result = false;
+        if (data != null && data.length > 2) {
+            int ch1 = (int) (data.byteAt(data.offset) & 0xff);
+            int ch2 = (int) (data.byteAt(data.offset + 1) & 0xff);
+            int magic = (ch1 | (ch2 << 8));
+            result = (magic == GZIPInputStream.GZIP_MAGIC);
+        }
+        return result;
+    }
+
+    /**
+     * @return the compressionLevel
+     */
+    public int getCompressionLevel() {
+        return this.compressionLevel;
+    }
+
+    /**
+     * @param compressionLevel
+     *            the compressionLevel to set These are the values past to the Deflater - 1 for best speed, 9 for best
+     *            compression (the default)
+     */
+    public void setCompressionLevel(int compressionLevel) {
+        this.compressionLevel = compressionLevel;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.activeblaze.BlazeConfiguration;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Fragments a packet
+ */
+
+public class FragmentationProcessor extends ChainedProcessor {
+    private static final Log LOG = LogFactory.getLog(FragmentationProcessor.class);
+    private int maxPacketSize = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
+    private int maxCacheSize = 16 * 1024;
+    private Map<String,List<Packet>> cache = new LinkedHashMap<String,List<Packet>>() {
+        protected boolean removeEldestEntry(Map.Entry<String,List<Packet>> eldest) {
+            return removeEldestCacheEntry(eldest.getKey());
+        }
+    };
+  
+        
+    public void downStream(Packet packet) throws Exception {
+        int size = packet.getPacketData().serializedSizeUnframed();
+        if (size > this.getMaxPacketSize()) {
+            Buffer payload = packet.getPacketData().getPayload();
+            packet.getPacketData().clearPayload();
+            packet=packet.clone();
+            int headerSize = packet.getPacketData().serializedSizeUnframed();
+            int fragmentSize = getMaxPacketSize()-headerSize;
+            int length = payload.length;
+            
+            int offset=payload.offset;
+            int numberOfParts = length/fragmentSize;
+            if (length%fragmentSize!=0) {
+                numberOfParts++;
+            }
+            int partNumber = 0;
+            while (offset < length) {
+                Buffer nextPayload = new Buffer(payload.data,offset,fragmentSize);
+                offset += fragmentSize;
+                Packet next = packet.clone();
+                next.getPacketData().setPayload(nextPayload);
+                next.getPacketData().setNumberOfParts(numberOfParts);
+                next.getPacketData().setPartNumber(partNumber);
+                partNumber++;
+                super.downStream(next);
+                
+            }
+        }else {
+            super.downStream(packet);
+        }
+    }
+
+    public void upStream(Packet packet) throws Exception {
+        if (packet.getPacketData().getNumberOfParts() > 1) {
+            synchronized(this.cache) {
+                List<Packet> value = this.cache.get(packet.getId());
+                if (value == null) {
+                    value = new ArrayList<Packet>(packet.getPacketData().getNumberOfParts());
+                    this.cache.put(packet.getId(), value);
+                }
+                value.add(packet.getPacketData().getPartNumber(),packet);
+                if (value.size()==packet.getPacketData().getNumberOfParts()) {
+                    Packet result = packet.clone();
+                    result.getPacketData().clearPayload();
+                    result.getPacketData().clearNumberOfParts();
+                    result.getPacketData().clearPartNumber();
+                    result.getPacketData().setNumberOfParts(1);
+                    result.getPacketData().setPartNumber(0);
+                    int size=0;
+                    for (Packet p:value) {
+                        size+= p.getPacketData().getPayload().length;
+                    }
+                    byte[] data = new byte[size];
+                    int offset=0;
+                    for (Packet p:value) {
+                        Buffer src = p.getPacketData().getPayload();
+                        System.arraycopy(src.data, src.offset, data, offset, src.length);
+                        offset+=src.length;
+                    }
+                    result.getPacketData().setPayload(new Buffer(data));
+                    this.cache.remove(packet.getId());
+                    super.upStream(result);
+                }
+            }
+        } else {
+            super.upStream(packet);
+        }
+    }
+
+    /**
+     * @return the maxPacketSize
+     */
+    public int getMaxPacketSize() {
+        return this.maxPacketSize;
+    }
+
+    /**
+     * @param maxPacketSize the maxPacketSize to set
+     */
+    public void setMaxPacketSize(int maxPacketSize) {
+        this.maxPacketSize = maxPacketSize;
+    }
+
+    /**
+     * @return the maxCacheSize
+     */
+    public int getMaxCacheSize() {
+        return this.maxCacheSize;
+    }
+
+    /**
+     * @param maxCacheSize the maxCacheSize to set
+     */
+    public void setMaxCacheSize(int maxCacheSize) {
+        this.maxCacheSize = maxCacheSize;
+    }
+    
+    protected boolean removeEldestCacheEntry(String id) {
+        boolean result = false;
+        if (this.cache.size()> getMaxCacheSize()) {
+            result = true;
+            LOG.warn("Cache too big - Discarding fragmented packets for " + id);
+        }
+        return result;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.activeblaze.wire.PacketData;
+
+/**
+ * Wrapper for PacketData
+ * 
+ */
+public final class Packet {
+    final private SocketAddress from;
+    private InetSocketAddress to;
+    private String id;
+    final private PacketData packetData;
+
+    /**
+     * Internal Constructor
+     * 
+     * @param id
+     * @param data
+     */
+    private Packet(String id, PacketData data) {
+        this.id = id;
+        this.packetData = data;
+        this.from = null;
+        this.to = null;
+    }
+
+    /**
+     * Construct a Packet from PacketData
+     * 
+     * @param data
+     */
+    public Packet(PacketData data) {
+        this.packetData = data;
+        if (data.hasMessageId()) {
+            this.id = this.packetData.getMessageId().toString();
+        }
+        this.from = null;
+        this.to = null;
+    }
+
+    /**
+     * Construct a Packet received
+     * 
+     * @param from
+     * @param data
+     */
+    public Packet(SocketAddress from, PacketData data) {
+        this.from = from;
+        this.packetData = data;
+        if (data.hasMessageId()) {
+            this.id = this.packetData.getMessageId().toString();
+        }
+        this.to = null;
+    }
+
+    /**
+     * Construct a Packet to send
+     * 
+     * @param toAddress
+     * @param toPort
+     * @param data
+     */
+    public Packet(InetAddress toAddress, int toPort, PacketData data) {
+        this.to = new InetSocketAddress(toAddress, toPort);
+        this.packetData = data;
+        this.id = this.packetData.getMessageId().toString();
+        this.from = null;
+    }
+
+    /**
+     * Construct a Packet to send
+     * 
+     * @param toAddress
+     * @param toPort
+     * @param data
+     */
+    public Packet(String toAddress, int toPort, PacketData data) {
+        this.to = new InetSocketAddress(toAddress, toPort);
+        this.packetData = data;
+        this.id = this.packetData.getMessageId().toString();
+        this.from = null;
+    }
+    
+    public String toString() {
+        StringBuilder builder = new StringBuilder("Packet:");
+        builder.append(getId());
+        builder.append("[");
+        builder.append(getPacketData().toString());
+        builder.append("]");
+        return builder.toString();
+    }
+
+    /**
+     * @return the id
+     */
+    public String getId() {
+        return this.id;
+    }
+
+    /**
+     * @return the packetData
+     */
+    public PacketData getPacketData() {
+        return this.packetData;
+    }
+
+    public Packet clone() {
+        return new Packet(this.id, this.packetData.clone());
+    }
+
+    /**
+     * @return the from
+     */
+    public SocketAddress getFrom() {
+        return this.from;
+    }
+
+    /**
+     * @return the to
+     */
+    public InetSocketAddress getTo() {
+        return this.to;
+    }
+
+    /**
+     * @param to the to to set
+     */
+    public void setTo(InetSocketAddress to) {
+        this.to = to;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.util.BitArrayBin;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+
+
+/**
+ * Checks for duplicates
+ * 
+ */
+public class PacketAudit extends BaseService {
+    private LinkedHashMap<Buffer, BitArrayBin> cache;
+    private int maxChannels = 256;
+    private int maxAuditDepth = 1024;
+
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        if (result) {
+            this.cache = null;
+        }
+        return result;
+    }
+
+    public boolean start() throws Exception {
+        boolean result = super.start();
+        if (result) {
+            if (this.cache == null) {
+                this.cache = new LinkedHashMap<Buffer, BitArrayBin>() {
+                    protected boolean removeEldestEntry(
+                            Map.Entry<Buffer, BitArrayBin> eldest) {
+                        return size() > getMaxChannels();
+                    }
+                };
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @return the maxChannels
+     */
+    public int getMaxChannels() {
+        return this.maxChannels;
+    }
+
+    /**
+     * @param maxChannels
+     *            the maxChannels to set
+     */
+    public void setMaxChannels(int maxChannels) {
+        this.maxChannels = maxChannels;
+    }
+
+    /**
+     * tests for a duplicate message
+     * 
+     * @param packet
+     * @return
+     */
+    public boolean isDuplicate(Packet packet) {
+        PacketData data = packet.getPacketData();
+        return isDuplicate(data.getProducerId(), data.getMessageSequence());
+    }
+
+    /**
+     * tests for a duplicate message
+     * 
+     * @param producerId
+     * @param id
+     * @return
+     */
+    public boolean isDuplicate(String producerId, long id) {
+        return isDuplicate(new Buffer(producerId), id);
+    }
+
+    /**
+     * tests for a duplicate message
+     * 
+     * @param key
+     * @param id
+     * @return
+     */
+    public boolean isDuplicate(Buffer key, long id) {
+        boolean result = false;
+        LinkedHashMap<Buffer, BitArrayBin> theCache = this.cache;
+        if (theCache != null) {
+            synchronized (theCache) {
+                BitArrayBin bin = theCache.get(key);
+                if (bin == null) {
+                    bin = new BitArrayBin(getMaxAuditDepth());
+                    theCache.put(key, bin);
+                }
+                result = bin.setBit(id, true);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @return the maxAuditDepth
+     */
+    public int getMaxAuditDepth() {
+        return this.maxAuditDepth;
+    }
+
+    /**
+     * @param maxAuditDepth
+     *            the maxAuditDepth to set
+     */
+    public void setMaxAuditDepth(int maxAuditDepth) {
+        this.maxAuditDepth = maxAuditDepth;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import org.apache.activeblaze.wire.MessageType;
+
+/**
+ * @author rajdavies
+ *
+ */
+public interface PacketMessageType {
+    /**
+     * @return the type of Packet
+     */
+    MessageType type();
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html Fri Nov 21 12:44:40 2008
@@ -0,0 +1,25 @@
+!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Processors for packets sent on the network
+
+</body>
+</html>
\ No newline at end of file

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable;
+
+import java.util.Map;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.util.ObjectFinder;
+import org.apache.activeblaze.util.PropertyUtil;
+
+/**
+ * Find a reliable implementation
+ *
+ */
+public class ReliableFactory {
+    
+    private static final ObjectFinder OBJECT_FINDER = new ObjectFinder("META-INF/services/org/apache/activeblaze/reliable/");
+
+    /**
+     * @param location
+     * @return the configured transport from its URI
+     * @throws Exception
+     */
+    public static ChainedProcessor get(String location) throws Exception {
+        ChainedProcessor result  = findReliable(location);
+        configure(result, location);
+        return result;
+    }
+    
+    static void configure(ChainedProcessor transport, String location) throws Exception {
+        Map<String, String> options = PropertyUtil.parseParameters(location);
+        PropertyUtil.setProperties(transport, options);
+    }
+    
+    private static ChainedProcessor findReliable(String location) throws Exception {
+    String scheme = PropertyUtil.stripBefore(location, '?');
+    if (scheme == null) {
+        throw new IllegalArgumentException("Reliability scheme not specified: [" + location + "]");
+    }
+    ChainedProcessor result = (ChainedProcessor) OBJECT_FINDER.newInstance(scheme);
+    return result;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable.flow;
+
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
+
+/**
+ * Simple FlowControl
+ * 
+ */
+public class SimpleFlow extends ChainedProcessor {
+    int maxWindowSize = 4 * 1024;
+    int windowSize = 0;
+    int pauseTime = 2;
+
+    public void downStream(Packet p) throws Exception {
+        this.windowSize += p.getPacketData().serializedSizeFramed();
+        if (this.windowSize >= this.maxWindowSize) {
+            Thread.sleep(this.pauseTime);
+            this.windowSize = 0;
+        }
+        super.downStream(p);
+    }
+
+    /**
+     * @return the maxWindowSize
+     */
+    public int getMaxWindowSize() {
+        return this.maxWindowSize;
+    }
+
+    /**
+     * @param maxWindowSize
+     *            the maxWindowSize to set
+     */
+    public void setMaxWindowSize(int maxWindowSize) {
+        this.maxWindowSize = maxWindowSize;
+    }
+
+    /**
+     * @return the pauseTime
+     */
+    public int getPauseTime() {
+        return this.pauseTime;
+    }
+
+    /**
+     * @param pauseTime
+     *            the pauseTime to set
+     */
+    public void setPauseTime(int pauseTime) {
+        this.pauseTime = pauseTime;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable.simple;
+
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.reliable.flow.SimpleFlow;
+
+/**
+ * Very basic (none) reliability
+ *
+ */
+public class SimpleReliableProcessor extends ChainedProcessor{
+    
+   private SimpleFlow simpleFlow;
+   
+   /**
+     * Constructor
+     */
+    public SimpleReliableProcessor() {
+       this.simpleFlow=new SimpleFlow();
+       setEnd(this.simpleFlow);
+   }
+   
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.transport;
+
+import java.net.URI;
+import org.apache.activeblaze.BlazeConfiguration;
+import org.apache.activeblaze.impl.processor.PacketAudit;
+
+/**
+ * Base Class for transports
+ *
+ */
+public abstract class BaseTransport extends ThreadChainedProcessor{
+    static final int DEFAULT_MAX_PACKET_SIZE = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
+    static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+    private URI localURI;
+    private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
+    private int bufferSize = DEFAULT_BUFFER_SIZE;
+    private int soTimeout = 2000;
+    private int timeToLive = 1;
+    private boolean loopBack = false;
+    protected final PacketAudit audit = new PacketAudit();
+    private boolean broadcast = false;
+    private boolean enableAudit = false;
+    
+        
+    public boolean init() throws Exception {
+        boolean result = super.init();
+        if(result) {
+            this.audit.init();
+        }
+        return result;
+    }
+
+    
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        if(result) {
+            this.audit.shutDown();
+        }
+        return result;
+    }
+
+    
+    public boolean start() throws Exception {
+        boolean result = super.start();
+        if(result) {
+            this.audit.start();
+        }
+        return result; 
+    }
+
+    
+    public boolean stop() throws Exception {
+        boolean result = super.stop();
+        if(result) {
+            this.audit.stop();
+        }
+        return result;
+    }
+
+    /**
+     * @return the localURI
+     */
+    public URI getLocalURI(){
+        return this.localURI;
+    }
+
+    /**
+     * @param localURI the localURI to set
+     */
+    public void setLocalURI(URI localURI){
+        this.localURI = localURI;
+    }
+
+    /**
+     * @return the maxPacketSize
+     */
+    public int getMaxPacketSize(){
+        return this.maxPacketSize;
+    }
+
+    /**
+     * @param maxPacketSize the maxPacketSize to set
+     */
+    public void setMaxPacketSize(int maxPacketSize){
+        this.maxPacketSize = maxPacketSize;
+    }
+
+    /**
+     * @return the bufferSize
+     */
+    public int getBufferSize(){
+        return this.bufferSize;
+    }
+
+    /**
+     * @param bufferSize the bufferSize to set
+     */
+    public void setBufferSize(int bufferSize){
+        this.bufferSize = bufferSize;
+    }
+
+    /**
+     * @return the soTimeout
+     */
+    public int getSoTimeout(){
+        return this.soTimeout;
+    }
+
+    /**
+     * @param soTimeout the soTimeout to set
+     */
+    public void setSoTimeout(int soTimeout){
+        this.soTimeout = soTimeout;
+    }
+    
+    /**
+     * @return the timeToLive
+     */
+    public int getTimeToLive(){
+        return this.timeToLive;
+    }
+
+    /**
+     * @param timeToLive the timeToLive to set
+     */
+    public void setTimeToLive(int timeToLive){
+        this.timeToLive = timeToLive;
+    }
+
+    /**
+     * @return the loopBack
+     */
+    public boolean isLoopBack(){
+        return this.loopBack;
+    }
+
+    /**
+     * @param loopBack the loopBack to set
+     */
+    public void setLoopBack(boolean loopBack){
+        this.loopBack = loopBack;
+    }
+    
+    /**
+     * @return the audit
+     */
+    protected PacketAudit getAudit(){
+        return this.audit;
+    }
+    
+    /**
+     * @return the broadcast
+     */
+    public boolean isBroadcast() {
+        return this.broadcast;
+    }
+
+    /**
+     * @param broadcast
+     *            the broadcast to set
+     */
+    public void setBroadcast(boolean broadcast) {
+        this.broadcast = broadcast;
+    }
+
+
+    /**
+     * @return the enableAudit
+     */
+    public boolean isEnableAudit() {
+        return this.enableAudit;
+    }
+
+
+    /**
+     * @param enableAudit the enableAudit to set
+     */
+    public void setEnableAudit(boolean enableAudit) {
+        this.enableAudit = enableAudit;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native