You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/21 21:44:43 UTC
svn commit: r719706 [4/6] - in /activemq/activemq-blaze: ./ branches/ tags/
trunk/ trunk/src/ trunk/src/main/ trunk/src/main/java/
trunk/src/main/java/org/ trunk/src/main/java/org/apache/
trunk/src/main/java/org/apache/activeblaze/ trunk/src/main/java/...
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.wire.DestinationData;
+import org.apache.activeblaze.wire.MemberData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Maintains members of a group
+ *
+ */
+public class Group extends BaseService {
+ static final Log LOG = LogFactory.getLog(Group.class);
+ final BlazeGroupChannelImpl channel;
+ private final BlazeGroupConfiguration configuration;
+ private ScheduledExecutorService heartBeatService;
+ private ScheduledExecutorService checkMembershipService;
+ protected Map<String, MemberImpl> members = new ConcurrentHashMap<String, MemberImpl>();
+ private List<MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
+ private final Map<Buffer, List<MemberImpl>> queueMap = new ConcurrentHashMap<Buffer, List<MemberImpl>>();
+ private final Map<Buffer, List<MemberImpl>> topicMap = new ConcurrentHashMap<Buffer, List<MemberImpl>>();
+
+ /**
+ * Constructor
+ *
+ * @param local
+ * @param channel
+ * @param transport
+ * @param config
+ */
+ protected Group(BlazeGroupChannelImpl channel) {
+ this.channel = channel;
+ this.configuration = channel.getGroupConfiguration();
+ }
+
+ /**
+ * @return the Member of the Channel
+ * @throws Exception
+ */
+ public MemberImpl getLocalMember() throws Exception {
+ return this.channel.getLocalMember();
+ }
+
+ void updateLocal(MemberImpl member) {
+ this.members.put(member.getId(), member);
+ }
+
+ /**
+ * @return the id of the local channel associated with this group
+ */
+ public String getId() {
+ return this.channel.getId();
+ }
+
+ /**
+ * @return the name of the local channel
+ */
+ public String getName() {
+ return this.channel.getName();
+ }
+
+ /**
+ * @return the configuration associated with this channel
+ */
+ public BlazeGroupConfiguration getConfiguration() {
+ return this.configuration;
+ }
+
+ void addMemberChangedListener(MemberChangedListener l) {
+ this.membershipListeners.add(l);
+ }
+
+ void removeMemberChangedListener(MemberChangedListener l) {
+ this.membershipListeners.add(l);
+ }
+
+ /**
+ * @return the members
+ */
+ Set<Member> getMembers() {
+ return new HashSet<Member>(this.members.values());
+ }
+
+ /**
+ * @return the members from this group
+ */
+ public Set<MemberImpl> getMembersImpl() {
+ return new HashSet<MemberImpl>(this.members.values());
+ }
+
+ /**
+ * Get a member by its unique id
+ *
+ * @param id
+ * @return
+ */
+ Member getMemberById(String id) {
+ return this.members.get(id);
+ }
+
+ /**
+ * Return a member of the Group with the matching name
+ *
+ * @param name
+ * @return
+ */
+ Member getMemberByName(String name) {
+ if (name != null) {
+ for (Member member : this.members.values()) {
+ if (member.getName().equals(name)) {
+ return member;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Will wait for a member to advertise itself if not available
+ * @param name
+ * @param timeout
+ * @return the member or null
+ * @throws InterruptedException
+ */
+ public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException {
+ Member result = null;
+ long deadline = 0;
+ if (timeout > 0) {
+ deadline = System.currentTimeMillis() + timeout;
+ }
+ while (true) {
+ result = getMemberByName(name);
+ if (result == null) {
+ synchronized(this.members) {
+ this.members.wait(timeout);
+ }
+ if (timeout > 0) {
+ timeout = (int) Math.max(deadline - System.currentTimeMillis(), 0l);
+ }
+ }else {
+ break;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#init()
+ */
+ public boolean init() throws Exception {
+ boolean result = super.init();
+ if (result) {
+ this.members.put(this.channel.getId(), this.channel.getLocalMember());
+ }
+ return result;
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#shutDown()
+ */
+ public boolean shutDown() throws Exception {
+ boolean result = super.shutDown();
+ if (result) {
+ }
+ return result;
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#start()
+ */
+ public boolean start() throws Exception {
+ boolean result = super.start();
+ if (result) {
+ this.heartBeatService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ Runnable heartbeat = new Runnable() {
+ public void run() {
+ try {
+ broadcastHeartBeat(getLocalMember());
+ } catch (Exception e) {
+ LOG.error("Failed to send heartbeat",e);
+ }
+ }
+ };
+ heartbeat.run();
+ int interval = this.configuration.getHeartBeatInterval();
+ this.heartBeatService.scheduleAtFixedRate(heartbeat, interval, interval, TimeUnit.MILLISECONDS);
+ this.checkMembershipService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ Runnable checkMembership = new Runnable() {
+ public void run() {
+ if (isStarted()) {
+ try {
+ checkMembership();
+ } catch (Exception e) {
+ LOG.error("Failed to checkMembership", e);
+ }
+ }
+ }
+ };
+ this.checkMembershipService.scheduleAtFixedRate(checkMembership, interval / 3, interval / 2,
+ TimeUnit.MILLISECONDS);
+ }
+ return result;
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#stop()
+ */
+ public boolean stop() throws Exception {
+ boolean result = super.stop();
+ if (result) {
+ if (this.heartBeatService != null) {
+ this.heartBeatService.shutdown();
+ }
+ if (this.checkMembershipService != null) {
+ this.checkMembershipService.shutdown();
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Process a new member
+ *
+ * @param data
+ * @throws Exception
+ * @return Member if a new member else null
+ */
+ protected MemberImpl processMember(MemberData data) throws Exception {
+ MemberImpl result = null;
+ MemberImpl old = null;
+ MemberImpl member = new MemberImpl(data);
+ if (!member.getId().equals(getLocalMember().getId())) {
+ member.setTimeStamp(System.currentTimeMillis());
+ if ((old = this.members.put(member.getId(), member)) == null) {
+ processMemberStarted(member);
+ if (!member.getId().equals(this.channel.getId())) {
+ this.channel.sendMessage(member.getAddress(), MessageType.MEMBER_DATA, this.channel
+ .getLocalMember().getData());
+ }
+ result = member;
+ } else {
+ if (data.getDestinationsChanged()) {
+ processMemberUpdate(old, member);
+ }
+ }
+ }
+ return result;
+ }
+
+ private void fireMemberStarted(Member member) {
+ LOG.debug(this.channel.getName() + " Member started " + member);
+ for (MemberChangedListener l : this.membershipListeners) {
+ l.memberStarted(member);
+ }
+ }
+
+ private void fireMemberStopped(Member member) {
+ LOG.debug(this.channel.getName() + " Member stopped " + member);
+ for (MemberChangedListener l : this.membershipListeners) {
+ l.memberStopped(member);
+ }
+ }
+
+ void checkMembership() throws Exception {
+ if (isStarted()) {
+ long checkTime = System.currentTimeMillis() - this.configuration.getHeartBeatInterval();
+ for (MemberImpl member : this.members.values()) {
+ if (!member.getId().equals(getId()) && member.getTimeStamp() < checkTime) {
+ LOG.debug(getId() +" Member timestamp expired " + member);
+ this.members.remove(member.getId());
+ processMemberStopped(member);
+ }
+ }
+ }
+ }
+
+ protected void processMemberStarted(MemberImpl member) throws Exception {
+ processDestinationsForStarted(member);
+ fireMemberStarted(member);
+ synchronized(this.members) {
+ this.members.notifyAll();
+ }
+ }
+
+ protected void processMemberStopped(MemberImpl member) throws Exception {
+ fireMemberStopped(member);
+ processDestinationsForStopped(member);
+ }
+
+ private void processDestinationsForStarted(MemberImpl member) {
+ List<DestinationData> destList = member.getData().getDestinationList();
+ for (DestinationData dest : destList) {
+ Buffer key = dest.getDestination();
+ Map<Buffer, List<MemberImpl>> map = null;
+ if (dest.getTopic()) {
+ map = this.topicMap;
+ } else {
+ map = this.queueMap;
+ }
+ List<MemberImpl> members = map.get(key);
+ if (members == null) {
+ members = new CopyOnWriteArrayList<MemberImpl>();
+ map.put(key, members);
+ }
+ members.add(member);
+ }
+ }
+
+ private void processDestinationsForStopped(MemberImpl member) {
+ List<DestinationData> destList = member.getData().getDestinationList();
+ for (DestinationData dest : destList) {
+ Buffer key = dest.getDestination();
+ Map<Buffer, List<MemberImpl>> map = null;
+ if (dest.getTopic()) {
+ map = this.topicMap;
+ } else {
+ map = this.queueMap;
+ }
+ List<MemberImpl> members = map.get(key);
+ if (members != null) {
+ members.remove(member);
+ if (members.isEmpty()) {
+ map.remove(key);
+ }
+ }
+ }
+ }
+
+ protected void processMemberUpdate(MemberImpl oldMember, MemberImpl newMember) throws Exception {
+ processDestinationsForStopped(oldMember);
+ processDestinationsForStarted(newMember);
+ }
+
+ /**
+ * @return the queueMap
+ */
+ protected Map<Buffer, List<MemberImpl>> getQueueMap() {
+ return this.queueMap;
+ }
+
+ /**
+ * @return the topicMap
+ */
+ protected Map<Buffer, List<MemberImpl>> getTopicMap() {
+ return this.topicMap;
+ }
+
+ protected void broadcastHeartBeat(MemberImpl local) throws Exception {
+ if (isStarted()) {
+ Group.this.channel.broadcastMessage(MessageType.MEMBER_DATA, local.getData());
+ }
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+
+/**
+ *A <CODE>Member</CODE> holds information about a member of the group
+ *
+ */
+public interface Member {
+
+ /**
+ * @return the name
+ */
+ public String getName();
+
+ /**
+ * @return the id
+ */
+ public String getId();
+
+
+ /**
+ * @return the startTime
+ */
+ public long getStartTime();
+
+ /**
+ * @return the timeStamp
+ */
+ long getTimeStamp();
+
+ /**
+ * Set the timestamp
+ * @param value
+ */
+ void setTimeStamp(long value);
+ /**
+ * @return the inbox destination
+ */
+ public String getInBoxDestination();
+
+
+ /**
+ * @return the coordinatorWeight
+ */
+ public long getCoordinatorWeight();
+
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activeblaze.group;
+
+
+/**
+ * A listener for membership changes to a group
+ *
+ */
+public interface MemberChangedListener {
+
+ /**
+ * Notification a member has started
+ * @param member
+ */
+ void memberStarted(Member member);
+
+ /**
+ * Notification a member has stopped
+ * @param member
+ */
+ void memberStopped(Member member);
+}
\ No newline at end of file
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.apache.activeblaze.wire.MemberData;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * Implementation of a Member
+ *
+ */
+public class MemberImpl implements Member {
+ private final MemberData data;
+ private final InetSocketAddress socketAddress;
+ private final Buffer socketAddressAsBuffer;
+
+
+ /**
+ * Default constructor
+ * @param id
+ * @param name
+ * @param coordinatorWeight
+ * @param localURI
+ * @throws Exception
+ */
+ public MemberImpl(String id,String name,long coordinatorWeight,URI localURI) throws Exception {
+ InetAddress addr = InetAddress.getByName(localURI.getHost());
+ this.socketAddress = new InetSocketAddress(addr,localURI.getPort());
+ this.socketAddressAsBuffer=new Buffer(this.socketAddress.toString());
+ this.data = new MemberData();
+ this.data.setId(id);
+ this.data.setName(name);
+ this.data.setCoordinatorWeight(coordinatorWeight);
+ this.data.setStartTime(System.currentTimeMillis());
+ this.data.setInetAddress(new Buffer(addr.getHostAddress()));
+ this.data.setPort(localURI.getPort());
+
+
+ }
+ /**
+ * Constructor
+ * @param data
+ * @throws Exception
+ */
+ public MemberImpl(MemberData data) throws Exception {
+ this.data = data;
+ InetAddress addr = InetAddress.getByName(data.getInetAddress().toStringUtf8());
+ this.socketAddress= new InetSocketAddress(addr,data.getPort());
+ this.socketAddressAsBuffer=new Buffer(this.socketAddress.toString());
+ }
+
+ /**
+ * @return the name
+ */
+ public String getName() {
+ return this.data.getName();
+ }
+
+ /**
+ * @return the id
+ */
+ public String getId() {
+ return this.data.getId();
+ }
+
+ void setId(String id) {
+ this.data.setId(id);
+ }
+
+
+ /**
+ * @return the startTime
+ */
+ public long getStartTime() {
+ return this.data.getStartTime();
+ }
+
+
+ /**
+ * @return the inbox destination
+ */
+ public String getInBoxDestination() {
+ return this.data.getId();
+ }
+
+ /**
+ * @return the SocketAddress for this member
+ */
+ public InetSocketAddress getAddress () {
+ return this.socketAddress;
+
+ }
+
+ /**
+ * @return address as a Buffer
+ */
+ public Buffer getAddressAsBuffer() {
+ return this.socketAddressAsBuffer;
+ }
+
+ /**
+ * @return the timeStamp
+ */
+ public long getTimeStamp() {
+ return this.data.getTimeStamp();
+ }
+
+ /**
+ * Set the timestamp
+ * @param value
+ */
+ public void setTimeStamp(long value) {
+ this.data.setTimeStamp(value);
+ }
+
+
+ /**
+ * @return the coordinatorWeight
+ */
+ public long getCoordinatorWeight() {
+ return this.data.getCoordinatorWeight();
+ }
+
+
+ public String toString() {
+ return this.data.getName()+"["+this.data.getId()+"]";
+ }
+
+
+ public int hashCode() {
+ return this.data.getId().hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ boolean result = false;
+ if (obj instanceof MemberImpl) {
+ MemberImpl other = (MemberImpl)obj;
+ result = this.data.getId().equals(other.data.getId());
+ }
+ return result;
+ }
+
+ /**
+ * @return the data
+ */
+ public MemberData getData() {
+ return this.data;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * request callback
+ *
+ */
+public interface RequestCallback {
+ /**
+ * Optionally called when a request is finished
+ * @param id
+ */
+ void finished(Buffer id);
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author state on a request
+ *
+ */
+class SendRequest {
+ private static final Log LOG = LogFactory.getLog(SendRequest.class);
+ private final AtomicBoolean done = new AtomicBoolean();
+ private Message<?> response;
+ private RequestCallback callback;
+
+ Object get(long timeout) {
+ synchronized (this.done) {
+ if (this.done.get() == false && this.response == null) {
+ try {
+ this.done.wait(timeout);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted in get("+timeout+")",e);
+ }
+ }
+ }
+ return this.response;
+ }
+
+ void put(Buffer id,Message<?> response) {
+ this.response = response;
+ cancel();
+ RequestCallback callback = this.callback;
+ if (callback != null) {
+ callback.finished(id);
+ }
+ }
+
+ void cancel() {
+ this.done.set(true);
+ synchronized (this.done) {
+ this.done.notifyAll();
+ }
+ }
+
+ void setCallback(RequestCallback callback) {
+ this.callback=callback;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html Fri Nov 21 12:44:40 2008
@@ -0,0 +1,26 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Group channel for communicating true point-to-point using unicast and
+Group membership
+
+</body>
+</html>
\ No newline at end of file
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.destination;
+
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * Matches a Destination Subject to wildcards
+ *
+ */
+public final class DestinationMatch {
+ static final byte MATCH_ELEMENT = '*';
+ static final byte MATCH_ALL = '>';
+ static final byte[] DELIMETERS = { '.', '/', '|' };
+
+ /**
+ * See if the destination matches with wild cards
+ *
+ * @param destination
+ * @param match
+ * @return true if its a match
+ */
+ public static boolean isMatch(String destination, String match) {
+ return isMatch(new Buffer(destination), new Buffer(match));
+ }
+ /**
+ * See if the destination matches with wild cards
+ *
+ * @param destination
+ * @param match
+ * @return true if its a match
+ */
+ public static boolean isMatch(Buffer destination, String match) {
+ return isMatch(destination, new Buffer(match));
+ }
+
+ /**
+ * See if the destination matches with wild cards
+ *
+ * @param destination
+ * @param match
+ * @return true if its a match
+ */
+ public static boolean isMatch(String destination, Buffer match) {
+ return isMatch(new Buffer(destination), match);
+ }
+
+ /**
+ * See if the destination matches with wild cards
+ *
+ * @param destination
+ * @param match
+ * @return
+ */
+ public static boolean isMatch(Buffer destination, Buffer match) {
+ boolean result = true;
+ boolean matchAll = false;
+ if (destination == null && match == null) {
+ return true;
+ }
+ if (destination == null || match == null) {
+ return false;
+ }
+ int destinationOffset = 0;
+ int matchOffset = 0;
+ while (destinationOffset < destination.length
+ && matchOffset < match.length) {
+ byte matchByte = match.byteAt(matchOffset);
+ byte destinationByte = destination.byteAt(destinationOffset);
+ if (matchByte != destinationByte || matchByte == MATCH_ALL || destinationByte == MATCH_ALL
+ || matchByte == MATCH_ELEMENT || destinationByte == MATCH_ELEMENT) {
+ if (matchByte == MATCH_ALL || destinationByte == MATCH_ALL) {
+ matchAll = true;
+ break;
+ } else if ((matchByte == MATCH_ELEMENT || destinationByte == MATCH_ELEMENT)
+ && (isMatchElement(match, matchOffset, match.length) || isMatchElement(destination,
+ destinationOffset, destination.length))) {
+ if (containsDelimeter(destination, destinationOffset, destination.length) == false
+ && containsDelimeter(match, matchOffset, match.length) == false) {
+ break;
+ } else {
+ matchOffset = offsetToNextToken(match, matchOffset, match.length);
+ destinationOffset = offsetToNextToken(destination, destinationOffset, destination.length);
+ continue;
+ }
+ } else {
+ result = false;
+ break;
+ }
+ }
+ matchOffset++;
+ destinationOffset++;
+ }
+ if (result
+ && (match.length != destination.length && matchOffset != match.length || destinationOffset != destination.length)
+ && !matchAll) {
+ result = isMatchAll(match, matchOffset, match.length)
+ || isMatchAll(destination, destinationOffset, destination.length);
+ }
+ return result;
+ }
+
+ static boolean isMatchAll(Buffer str, int offset, int count) {
+ boolean result = false;
+ if (str != null && offset < str.length) {
+ byte offByte = str.byteAt(offset);
+ byte offBytePlusOne = (byte) ((offset + 1) < str.length ? str.byteAt(offset + 1) : ' ');
+ if (offset + 1 < str.length && isDelimiter(offByte)) {
+ if (offBytePlusOne == MATCH_ALL) {
+ result = true;
+ }
+ } else if ((offByte == MATCH_ALL || offByte == MATCH_ELEMENT)
+ && (isWhiteSpace(str, offset + 1, count) || isDelimiter(offBytePlusOne) || offset + 1 == str.length)) {
+ result = true;
+ }
+ }
+ return result;
+ }
+
+ static boolean isMatchElement(Buffer str, int offset, int count) {
+ boolean result = false;
+ if (str.byteAt(offset) == MATCH_ELEMENT) {
+ if (offset == 0 || isDelimiter(str.byteAt(offset - 1))) {
+ result = ((offset + 1) >= str.length) || isDelimiter(str.byteAt(offset + 1))
+ || isWhiteSpace(str, offset + 1, count);
+ }
+ }
+ return result;
+ }
+
+ private static boolean isWhiteSpace(Buffer str, int offset, int len) {
+ boolean result = true;
+ while ((offset < len)) {
+ if (str.byteAt(offset++) > ' ') {
+ result = false;
+ break;
+ }
+ }
+ return result;
+ }
+
+ static int offsetToNextToken(Buffer str, int offset, int len) {
+ while (offset < len) {
+ if (isDelimiter(str.byteAt(offset)))
+ break;
+ offset++;
+ }
+ return offset;
+ }
+
+ static int offsetToNextElement(Buffer str, int offset, int len) {
+ int result = -1;
+ int count = offset;
+ while (count < len) {
+ if (isDelimiter(str.byteAt(count))) {
+ result = ++count;
+ // check for double placed elements ...
+ while (count < len && isDelimiter(str.byteAt(count))) {
+ result = ++count;
+ }
+ break;
+ }
+ count++;
+ }
+ return result;
+ }
+
+ private static boolean containsDelimeter(Buffer str, int offset, int len) {
+ boolean result = false;
+ while (offset < len) {
+ if (isDelimiter(str.byteAt(offset++))) {
+ result = true;
+ break;
+ }
+ }
+ return result;
+ }
+
+ private static final boolean isDelimiter(byte b) {
+ for (int i = 0; i < DELIMETERS.length; i++) {
+ if (b == DELIMETERS[i]) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html Fri Nov 21 12:44:40 2008
@@ -0,0 +1,25 @@
+!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+destination utility classes
+
+</body>
+</html>
\ No newline at end of file
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.ExceptionListener;
+import org.apache.activeblaze.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Chains Processors together
+ *
+ */
+public class ChainedProcessor extends BaseService implements Processor {
+ private static final Log LOG = LogFactory.getLog(ChainedProcessor.class);
+ private Processor next;
+ private Processor prev;
+ protected ExceptionListener exceptionListener;
+
+ protected ChainedProcessor() {
+ }
+
+ /**
+ * @return the next
+ */
+ public Processor getNext() {
+ return this.next;
+ }
+
+ /**
+ * Set Next at the end of the chain
+ * @param next
+ *
+ */
+ public void setEnd(Processor next) {
+ ChainedProcessor target = this;
+ Processor n = getNext();
+ while (n != null) {
+ if (n instanceof ChainedProcessor) {
+ ChainedProcessor cn = (ChainedProcessor) n;
+ target = cn;
+ n = cn.getNext();
+ }
+ }
+ if(next instanceof ChainedProcessor) {
+ target.setNextChain((ChainedProcessor) next);
+ }else {
+ target.next=next;
+ }
+ }
+
+ /**
+ * Set the next
+ * @param next
+ */
+ public void setNext(Processor next) {
+ this.next=next;
+ }
+
+ /**
+ * @return the prev
+ */
+ public Processor getPrev() {
+ return this.prev;
+ }
+
+ /**
+ * Set the next chain
+ *
+ * @param p
+ */
+ public void setNextChain(ChainedProcessor p) {
+ ChainedProcessor target = this;
+ Processor n = getNext();
+ while (n != null) {
+ if (n instanceof ChainedProcessor) {
+ ChainedProcessor cn = (ChainedProcessor) n;
+ target = cn;
+ n = cn.getNext();
+ }
+ }
+ target.next=p;
+ p.setPrev(target);
+ if (this.exceptionListener != null && p.exceptionListener == null) {
+ p.exceptionListener = this.exceptionListener;
+ }
+ }
+
+ /**
+ * @param prev
+ * the prev to set
+ */
+ public void setPrev(Processor prev) {
+ this.prev = prev;
+ }
+
+ public boolean init() throws Exception {
+ boolean result = super.init();
+ if (result && this.next != null) {
+ result = this.next.init();
+ }
+ return result;
+ }
+
+ public boolean shutDown() throws Exception {
+ boolean result = super.shutDown();
+ if (result && this.next != null) {
+ result = this.next.shutDown();
+ }
+ return result;
+ }
+
+ public boolean start() throws Exception {
+ boolean result = super.start();
+ if (result && this.next != null) {
+ result = this.next.start();
+ }
+ return result;
+ }
+
+ public boolean stop() throws Exception {
+ boolean result = super.stop();
+ if (result && this.next != null) {
+ result = this.next.stop();
+ }
+ return result;
+ }
+
+ public void downStream(Packet packet) throws Exception {
+ if (this.next != null) {
+ this.next.downStream(packet);
+ }
+ }
+
+ public void upStream(Packet packet) throws Exception {
+ if (this.prev != null) {
+ this.prev.upStream(packet);
+ }
+ }
+
+ public void setExceptionListener(ExceptionListener l) {
+ this.exceptionListener = l;
+ }
+
+ protected void fireException(String reason, Exception e) {
+ doFireException(new BlazeException(reason, e));
+ }
+
+ protected void fireException(Exception e) {
+ doFireException(new BlazeException(e));
+ }
+
+ protected void fireException(String reason) {
+ doFireException(new BlazeException(reason));
+ }
+
+ protected void doFireException(Exception e) {
+ ExceptionListener l = this.exceptionListener;
+ if (l != null) {
+ l.onException(e);
+ } else {
+ LOG.error("No exception listener - caught exception ", e);
+ }
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * Compresses PacketData
+ *
+ */
+public class CompressionProcessor extends ChainedProcessor {
+ private int compressionLimit = 8192;
+ private int compressionLevel = Deflater.BEST_COMPRESSION;
+ private class CompressionStream extends GZIPOutputStream {
+ CompressionStream(OutputStream out, int size) throws IOException {
+ super(out, size);
+ this.def.setLevel(getCompressionLevel());
+ }
+ }
+
+ /**
+ * @return the limit beyond which data will be compresses
+ */
+ public int getCompressionLimit() {
+ return this.compressionLimit;
+ }
+
+ /**
+ * @param compressionLimit -
+ * set the compressionLimit
+ */
+ public void setCompressionLimit(int compressionLimit) {
+ this.compressionLimit = compressionLimit;
+ }
+
+ public void downStream(Packet packet) throws Exception {
+ Buffer data = packet.getPacketData().getPayload();
+ if (data != null && data.length >= this.compressionLimit) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(data.length);
+ try {
+ GZIPOutputStream gzipOut = new CompressionStream(bytesOut, data.length);
+ gzipOut.write(data.toByteArray());
+ gzipOut.close();
+ bytesOut.close();
+ byte[] result = bytesOut.toByteArray();
+ packet.getPacketData().clearPayload();
+ // need to clone to get sizing correct
+ packet = packet.clone();
+ packet.getPacketData().setPayload(new Buffer(result));
+ } catch (IOException e) {
+ fireException("Failed to deflate packet", e);
+ }
+ }
+ super.downStream(packet);
+ }
+
+ public void upStream(Packet packet) throws Exception {
+ Buffer data = packet.getPacketData().getPayload();
+ ;
+ if (CompressionProcessor.isCompressed(data)) {
+ InputStream bytesIn = data.newInput();
+ try {
+ GZIPInputStream gzipIn = new GZIPInputStream(bytesIn);
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ byte[] buffer = new byte[4096];
+ int bytesRead = 0;
+ while ((bytesRead = gzipIn.read(buffer, 0, buffer.length)) > 0) {
+ bytesOut.write(buffer, 0, bytesRead);
+ }
+ gzipIn.close();
+ bytesIn.close();
+ byte[] result = bytesOut.toByteArray();
+ bytesOut.close();
+ packet.getPacketData().clearPayload();
+ // need to clone to get sizing correct
+ packet = packet.clone();
+ packet.getPacketData().setPayload(new Buffer(result));
+ } catch (IOException e) {
+ fireException("Failed to inflate packet", e);
+ }
+ }
+ super.upStream(packet);
+ }
+
+ static boolean isCompressed(Buffer data) {
+ boolean result = false;
+ if (data != null && data.length > 2) {
+ int ch1 = (int) (data.byteAt(data.offset) & 0xff);
+ int ch2 = (int) (data.byteAt(data.offset + 1) & 0xff);
+ int magic = (ch1 | (ch2 << 8));
+ result = (magic == GZIPInputStream.GZIP_MAGIC);
+ }
+ return result;
+ }
+
+ /**
+ * @return the compressionLevel
+ */
+ public int getCompressionLevel() {
+ return this.compressionLevel;
+ }
+
+ /**
+ * @param compressionLevel
+ * the compressionLevel to set These are the values past to the Deflater - 1 for best speed, 9 for best
+ * compression (the default)
+ */
+ public void setCompressionLevel(int compressionLevel) {
+ this.compressionLevel = compressionLevel;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.activeblaze.BlazeConfiguration;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Fragments a packet
+ */
+
+public class FragmentationProcessor extends ChainedProcessor {
+ private static final Log LOG = LogFactory.getLog(FragmentationProcessor.class);
+ private int maxPacketSize = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
+ private int maxCacheSize = 16 * 1024;
+ private Map<String,List<Packet>> cache = new LinkedHashMap<String,List<Packet>>() {
+ protected boolean removeEldestEntry(Map.Entry<String,List<Packet>> eldest) {
+ return removeEldestCacheEntry(eldest.getKey());
+ }
+ };
+
+
+ public void downStream(Packet packet) throws Exception {
+ int size = packet.getPacketData().serializedSizeUnframed();
+ if (size > this.getMaxPacketSize()) {
+ Buffer payload = packet.getPacketData().getPayload();
+ packet.getPacketData().clearPayload();
+ packet=packet.clone();
+ int headerSize = packet.getPacketData().serializedSizeUnframed();
+ int fragmentSize = getMaxPacketSize()-headerSize;
+ int length = payload.length;
+
+ int offset=payload.offset;
+ int numberOfParts = length/fragmentSize;
+ if (length%fragmentSize!=0) {
+ numberOfParts++;
+ }
+ int partNumber = 0;
+ while (offset < length) {
+ Buffer nextPayload = new Buffer(payload.data,offset,fragmentSize);
+ offset += fragmentSize;
+ Packet next = packet.clone();
+ next.getPacketData().setPayload(nextPayload);
+ next.getPacketData().setNumberOfParts(numberOfParts);
+ next.getPacketData().setPartNumber(partNumber);
+ partNumber++;
+ super.downStream(next);
+
+ }
+ }else {
+ super.downStream(packet);
+ }
+ }
+
+ public void upStream(Packet packet) throws Exception {
+ if (packet.getPacketData().getNumberOfParts() > 1) {
+ synchronized(this.cache) {
+ List<Packet> value = this.cache.get(packet.getId());
+ if (value == null) {
+ value = new ArrayList<Packet>(packet.getPacketData().getNumberOfParts());
+ this.cache.put(packet.getId(), value);
+ }
+ value.add(packet.getPacketData().getPartNumber(),packet);
+ if (value.size()==packet.getPacketData().getNumberOfParts()) {
+ Packet result = packet.clone();
+ result.getPacketData().clearPayload();
+ result.getPacketData().clearNumberOfParts();
+ result.getPacketData().clearPartNumber();
+ result.getPacketData().setNumberOfParts(1);
+ result.getPacketData().setPartNumber(0);
+ int size=0;
+ for (Packet p:value) {
+ size+= p.getPacketData().getPayload().length;
+ }
+ byte[] data = new byte[size];
+ int offset=0;
+ for (Packet p:value) {
+ Buffer src = p.getPacketData().getPayload();
+ System.arraycopy(src.data, src.offset, data, offset, src.length);
+ offset+=src.length;
+ }
+ result.getPacketData().setPayload(new Buffer(data));
+ this.cache.remove(packet.getId());
+ super.upStream(result);
+ }
+ }
+ } else {
+ super.upStream(packet);
+ }
+ }
+
+ /**
+ * @return the maxPacketSize
+ */
+ public int getMaxPacketSize() {
+ return this.maxPacketSize;
+ }
+
+ /**
+ * @param maxPacketSize the maxPacketSize to set
+ */
+ public void setMaxPacketSize(int maxPacketSize) {
+ this.maxPacketSize = maxPacketSize;
+ }
+
+ /**
+ * @return the maxCacheSize
+ */
+ public int getMaxCacheSize() {
+ return this.maxCacheSize;
+ }
+
+ /**
+ * @param maxCacheSize the maxCacheSize to set
+ */
+ public void setMaxCacheSize(int maxCacheSize) {
+ this.maxCacheSize = maxCacheSize;
+ }
+
+ protected boolean removeEldestCacheEntry(String id) {
+ boolean result = false;
+ if (this.cache.size()> getMaxCacheSize()) {
+ result = true;
+ LOG.warn("Cache too big - Discarding fragmented packets for " + id);
+ }
+ return result;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.activeblaze.wire.PacketData;
+
+/**
+ * Wrapper for PacketData
+ *
+ */
+public final class Packet {
+ final private SocketAddress from;
+ private InetSocketAddress to;
+ private String id;
+ final private PacketData packetData;
+
+ /**
+ * Internal Constructor
+ *
+ * @param id
+ * @param data
+ */
+ private Packet(String id, PacketData data) {
+ this.id = id;
+ this.packetData = data;
+ this.from = null;
+ this.to = null;
+ }
+
+ /**
+ * Construct a Packet from PacketData
+ *
+ * @param data
+ */
+ public Packet(PacketData data) {
+ this.packetData = data;
+ if (data.hasMessageId()) {
+ this.id = this.packetData.getMessageId().toString();
+ }
+ this.from = null;
+ this.to = null;
+ }
+
+ /**
+ * Construct a Packet received
+ *
+ * @param from
+ * @param data
+ */
+ public Packet(SocketAddress from, PacketData data) {
+ this.from = from;
+ this.packetData = data;
+ if (data.hasMessageId()) {
+ this.id = this.packetData.getMessageId().toString();
+ }
+ this.to = null;
+ }
+
+ /**
+ * Construct a Packet to send
+ *
+ * @param toAddress
+ * @param toPort
+ * @param data
+ */
+ public Packet(InetAddress toAddress, int toPort, PacketData data) {
+ this.to = new InetSocketAddress(toAddress, toPort);
+ this.packetData = data;
+ this.id = this.packetData.getMessageId().toString();
+ this.from = null;
+ }
+
+ /**
+ * Construct a Packet to send
+ *
+ * @param toAddress
+ * @param toPort
+ * @param data
+ */
+ public Packet(String toAddress, int toPort, PacketData data) {
+ this.to = new InetSocketAddress(toAddress, toPort);
+ this.packetData = data;
+ this.id = this.packetData.getMessageId().toString();
+ this.from = null;
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder("Packet:");
+ builder.append(getId());
+ builder.append("[");
+ builder.append(getPacketData().toString());
+ builder.append("]");
+ return builder.toString();
+ }
+
+ /**
+ * @return the id
+ */
+ public String getId() {
+ return this.id;
+ }
+
+ /**
+ * @return the packetData
+ */
+ public PacketData getPacketData() {
+ return this.packetData;
+ }
+
+ public Packet clone() {
+ return new Packet(this.id, this.packetData.clone());
+ }
+
+ /**
+ * @return the from
+ */
+ public SocketAddress getFrom() {
+ return this.from;
+ }
+
+ /**
+ * @return the to
+ */
+ public InetSocketAddress getTo() {
+ return this.to;
+ }
+
+ /**
+ * @param to the to to set
+ */
+ public void setTo(InetSocketAddress to) {
+ this.to = to;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.util.BitArrayBin;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+
+
+/**
+ * Checks for duplicates
+ *
+ */
+public class PacketAudit extends BaseService {
+ private LinkedHashMap<Buffer, BitArrayBin> cache;
+ private int maxChannels = 256;
+ private int maxAuditDepth = 1024;
+
+ public boolean shutDown() throws Exception {
+ boolean result = super.shutDown();
+ if (result) {
+ this.cache = null;
+ }
+ return result;
+ }
+
+ public boolean start() throws Exception {
+ boolean result = super.start();
+ if (result) {
+ if (this.cache == null) {
+ this.cache = new LinkedHashMap<Buffer, BitArrayBin>() {
+ protected boolean removeEldestEntry(
+ Map.Entry<Buffer, BitArrayBin> eldest) {
+ return size() > getMaxChannels();
+ }
+ };
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @return the maxChannels
+ */
+ public int getMaxChannels() {
+ return this.maxChannels;
+ }
+
+ /**
+ * @param maxChannels
+ * the maxChannels to set
+ */
+ public void setMaxChannels(int maxChannels) {
+ this.maxChannels = maxChannels;
+ }
+
+ /**
+ * tests for a duplicate message
+ *
+ * @param packet
+ * @return
+ */
+ public boolean isDuplicate(Packet packet) {
+ PacketData data = packet.getPacketData();
+ return isDuplicate(data.getProducerId(), data.getMessageSequence());
+ }
+
+ /**
+ * tests for a duplicate message
+ *
+ * @param producerId
+ * @param id
+ * @return
+ */
+ public boolean isDuplicate(String producerId, long id) {
+ return isDuplicate(new Buffer(producerId), id);
+ }
+
+ /**
+ * tests for a duplicate message
+ *
+ * @param key
+ * @param id
+ * @return
+ */
+ public boolean isDuplicate(Buffer key, long id) {
+ boolean result = false;
+ LinkedHashMap<Buffer, BitArrayBin> theCache = this.cache;
+ if (theCache != null) {
+ synchronized (theCache) {
+ BitArrayBin bin = theCache.get(key);
+ if (bin == null) {
+ bin = new BitArrayBin(getMaxAuditDepth());
+ theCache.put(key, bin);
+ }
+ result = bin.setBit(id, true);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @return the maxAuditDepth
+ */
+ public int getMaxAuditDepth() {
+ return this.maxAuditDepth;
+ }
+
+ /**
+ * @param maxAuditDepth
+ * the maxAuditDepth to set
+ */
+ public void setMaxAuditDepth(int maxAuditDepth) {
+ this.maxAuditDepth = maxAuditDepth;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import org.apache.activeblaze.wire.MessageType;
+
+/**
+ * @author rajdavies
+ *
+ */
+public interface PacketMessageType {
+ /**
+ * @return the type of Packet
+ */
+ MessageType type();
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html Fri Nov 21 12:44:40 2008
@@ -0,0 +1,25 @@
+!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Processors for packets sent on the network
+
+</body>
+</html>
\ No newline at end of file
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable;
+
+import java.util.Map;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.util.ObjectFinder;
+import org.apache.activeblaze.util.PropertyUtil;
+
+/**
+ * Find a reliable implementation
+ *
+ */
+public class ReliableFactory {
+
+ private static final ObjectFinder OBJECT_FINDER = new ObjectFinder("META-INF/services/org/apache/activeblaze/reliable/");
+
+ /**
+ * @param location
+ * @return the configured transport from its URI
+ * @throws Exception
+ */
+ public static ChainedProcessor get(String location) throws Exception {
+ ChainedProcessor result = findReliable(location);
+ configure(result, location);
+ return result;
+ }
+
+ static void configure(ChainedProcessor transport, String location) throws Exception {
+ Map<String, String> options = PropertyUtil.parseParameters(location);
+ PropertyUtil.setProperties(transport, options);
+ }
+
+ private static ChainedProcessor findReliable(String location) throws Exception {
+ String scheme = PropertyUtil.stripBefore(location, '?');
+ if (scheme == null) {
+ throw new IllegalArgumentException("Reliability scheme not specified: [" + location + "]");
+ }
+ ChainedProcessor result = (ChainedProcessor) OBJECT_FINDER.newInstance(scheme);
+ return result;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable.flow;
+
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
+
+/**
+ * Simple FlowControl
+ *
+ */
+public class SimpleFlow extends ChainedProcessor {
+ int maxWindowSize = 4 * 1024;
+ int windowSize = 0;
+ int pauseTime = 2;
+
+ public void downStream(Packet p) throws Exception {
+ this.windowSize += p.getPacketData().serializedSizeFramed();
+ if (this.windowSize >= this.maxWindowSize) {
+ Thread.sleep(this.pauseTime);
+ this.windowSize = 0;
+ }
+ super.downStream(p);
+ }
+
+ /**
+ * @return the maxWindowSize
+ */
+ public int getMaxWindowSize() {
+ return this.maxWindowSize;
+ }
+
+ /**
+ * @param maxWindowSize
+ * the maxWindowSize to set
+ */
+ public void setMaxWindowSize(int maxWindowSize) {
+ this.maxWindowSize = maxWindowSize;
+ }
+
+ /**
+ * @return the pauseTime
+ */
+ public int getPauseTime() {
+ return this.pauseTime;
+ }
+
+ /**
+ * @param pauseTime
+ * the pauseTime to set
+ */
+ public void setPauseTime(int pauseTime) {
+ this.pauseTime = pauseTime;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable.simple;
+
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.reliable.flow.SimpleFlow;
+
+/**
+ * Very basic (none) reliability
+ *
+ */
+public class SimpleReliableProcessor extends ChainedProcessor{
+
+ private SimpleFlow simpleFlow;
+
+ /**
+ * Constructor
+ */
+ public SimpleReliableProcessor() {
+ this.simpleFlow=new SimpleFlow();
+ setEnd(this.simpleFlow);
+ }
+
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.transport;
+
+import java.net.URI;
+import org.apache.activeblaze.BlazeConfiguration;
+import org.apache.activeblaze.impl.processor.PacketAudit;
+
+/**
+ * Base Class for transports
+ *
+ */
+public abstract class BaseTransport extends ThreadChainedProcessor{
+ static final int DEFAULT_MAX_PACKET_SIZE = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
+ static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private URI localURI;
+ private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
+ private int bufferSize = DEFAULT_BUFFER_SIZE;
+ private int soTimeout = 2000;
+ private int timeToLive = 1;
+ private boolean loopBack = false;
+ protected final PacketAudit audit = new PacketAudit();
+ private boolean broadcast = false;
+ private boolean enableAudit = false;
+
+
+ public boolean init() throws Exception {
+ boolean result = super.init();
+ if(result) {
+ this.audit.init();
+ }
+ return result;
+ }
+
+
+ public boolean shutDown() throws Exception {
+ boolean result = super.shutDown();
+ if(result) {
+ this.audit.shutDown();
+ }
+ return result;
+ }
+
+
+ public boolean start() throws Exception {
+ boolean result = super.start();
+ if(result) {
+ this.audit.start();
+ }
+ return result;
+ }
+
+
+ public boolean stop() throws Exception {
+ boolean result = super.stop();
+ if(result) {
+ this.audit.stop();
+ }
+ return result;
+ }
+
+ /**
+ * @return the localURI
+ */
+ public URI getLocalURI(){
+ return this.localURI;
+ }
+
+ /**
+ * @param localURI the localURI to set
+ */
+ public void setLocalURI(URI localURI){
+ this.localURI = localURI;
+ }
+
+ /**
+ * @return the maxPacketSize
+ */
+ public int getMaxPacketSize(){
+ return this.maxPacketSize;
+ }
+
+ /**
+ * @param maxPacketSize the maxPacketSize to set
+ */
+ public void setMaxPacketSize(int maxPacketSize){
+ this.maxPacketSize = maxPacketSize;
+ }
+
+ /**
+ * @return the bufferSize
+ */
+ public int getBufferSize(){
+ return this.bufferSize;
+ }
+
+ /**
+ * @param bufferSize the bufferSize to set
+ */
+ public void setBufferSize(int bufferSize){
+ this.bufferSize = bufferSize;
+ }
+
+ /**
+ * @return the soTimeout
+ */
+ public int getSoTimeout(){
+ return this.soTimeout;
+ }
+
+ /**
+ * @param soTimeout the soTimeout to set
+ */
+ public void setSoTimeout(int soTimeout){
+ this.soTimeout = soTimeout;
+ }
+
+ /**
+ * @return the timeToLive
+ */
+ public int getTimeToLive(){
+ return this.timeToLive;
+ }
+
+ /**
+ * @param timeToLive the timeToLive to set
+ */
+ public void setTimeToLive(int timeToLive){
+ this.timeToLive = timeToLive;
+ }
+
+ /**
+ * @return the loopBack
+ */
+ public boolean isLoopBack(){
+ return this.loopBack;
+ }
+
+ /**
+ * @param loopBack the loopBack to set
+ */
+ public void setLoopBack(boolean loopBack){
+ this.loopBack = loopBack;
+ }
+
+ /**
+ * @return the audit
+ */
+ protected PacketAudit getAudit(){
+ return this.audit;
+ }
+
+ /**
+ * @return the broadcast
+ */
+ public boolean isBroadcast() {
+ return this.broadcast;
+ }
+
+ /**
+ * @param broadcast
+ * the broadcast to set
+ */
+ public void setBroadcast(boolean broadcast) {
+ this.broadcast = broadcast;
+ }
+
+
+ /**
+ * @return the enableAudit
+ */
+ public boolean isEnableAudit() {
+ return this.enableAudit;
+ }
+
+
+ /**
+ * @param enableAudit the enableAudit to set
+ */
+ public void setEnableAudit(boolean enableAudit) {
+ this.enableAudit = enableAudit;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
------------------------------------------------------------------------------
svn:eol-style = native