You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:23:08 UTC
[44/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LogicalLink.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LogicalLink.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LogicalLink.java
deleted file mode 100644
index 0d095a5..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LogicalLink.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: LogicalLink.java,v 1.5 2005/05/30 16:14:34 belaban Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.InetAddress;
-import java.util.Vector;
-
-
-/**
- * Implements a logical point-to-point link between 2 entities consisting of a number of physical links.
- * Traffic is routed over any of the physical link, according to policies. Examples are: send traffic
- * over all links, round-robin, use first link for 70% of traffic, other links for the remaining 30%.
- *
- * @author Bela Ban, June 2000
- */
-public class LogicalLink implements Link.Receiver {
- Receiver receiver=null;
- final Vector links=new Vector(); // of Links
- static/*GemStoneAddition*/ final int link_to_use=0;
- GemFireTracer log=GemFireTracer.getLog(getClass());
-
-
- static/*GemStoneAddition*/ public class NoLinksAvailable extends Exception {
- private static final long serialVersionUID = -4180512062659195788L;
- @Override // GemStoneAddition
- public String toString() {
- return "LogicalLinks.NoLinksAvailable: there are no physical links available";
- }
- }
-
- static/*GemStoneAddition*/ public class AllLinksDown extends Exception {
- private static final long serialVersionUID = -2294651737997827005L;
- @Override // GemStoneAddition
- public String toString() {
- return "LogicalLinks.AllLinksDown: all physical links are currently down";
- }
- }
-
-
- public interface Receiver {
- void receive(byte[] buf);
-
- void linkDown(InetAddress local, int local_port, InetAddress remote, int remote_port);
-
- void linkUp(InetAddress local, int local_port, InetAddress remote, int remote_port);
-
- void missedHeartbeat(InetAddress local, int local_port, InetAddress remote, int remote_port, int num_hbs);
-
- void receivedHeartbeatAgain(InetAddress local, int local_port, InetAddress remote, int remote_port);
- }
-
-
- public LogicalLink(Receiver r) {
- receiver=r;
-
- }
-
- public LogicalLink() {
-
- }
-
-
- public void addLink(String local_addr, int local_port, String remote_addr, int remote_port) {
- Link new_link=new Link(local_addr, local_port, remote_addr, remote_port, this);
- if(links.contains(new_link))
- log.error(ExternalStrings.LogicalLink_LOGICALLINKADD_LINK__0__IS_ALREADY_PRESENT, new_link);
- else
- links.addElement(new_link);
- }
-
-
- public void addLink(String local_addr, int local_port, String remote_addr, int remote_port,
- long timeout, long hb_interval) {
- Link new_link=new Link(local_addr, local_port, remote_addr, remote_port, timeout, hb_interval, this);
- if(links.contains(new_link))
- log.error(ExternalStrings.LogicalLink_LOGICALLINKADD_LINK__0__IS_ALREADY_PRESENT, new_link);
- else
- links.addElement(new_link);
- }
-
-
- public void removeAllLinks() {
- Link tmp;
- for(int i=0; i < links.size(); i++) {
- tmp=(Link)links.elementAt(i);
- tmp.stop();
- }
- links.removeAllElements();
- }
-
-
- public Vector getLinks() {
- return links;
- }
-
-
- public int numberOfLinks() {
- return links.size();
- }
-
-
- public int numberOfEstablishedLinks() {
- int n=0;
-
- for(int i=0; i < links.size(); i++) {
- if(((Link)links.elementAt(i)).established())
- n++;
- }
- return n;
- }
-
-
- /**
- * Start all links
- */
- public void start() {
- Link tmp;
- for(int i=0; i < links.size(); i++) {
- tmp=(Link)links.elementAt(i);
- try {
- tmp.start();
- }
- catch(Exception ex) {
- log.error(ExternalStrings.LogicalLink_LOGICALLINKSTART_COULD_NOT_CREATE_PHYSICAL_LINK_REASON__0, ex);
- }
- }
- }
-
-
- /**
- * Stop all links
- */
- public void stop() {
- Link tmp;
- for(int i=0; i < links.size(); i++) {
- tmp=(Link)links.elementAt(i);
- tmp.stop();
- }
- }
-
-
- /**
- * Send a message to the other side
- */
- public boolean send(byte[] buf) throws AllLinksDown, NoLinksAvailable {
- Link link;
- int link_used=0;
-
- if(buf == null || buf.length == 0) {
- log.error(ExternalStrings.LogicalLink_LOGICALLINKSEND_BUF_IS_NULL_OR_EMPTY);
- return false;
- }
-
- if(links.size() == 0)
- throw new NoLinksAvailable();
-
-
-
- // current policy (make policies configurable later !): alternate between links.
- // if failure, take first link that works
- // link=(Link)links.elementAt(link_to_use);
- // if(link.send(buf)) {
- // System.out.println("Send over link #" + link_to_use + ": " + link);
- // link_to_use=(link_to_use + 1) % links.size();
- // return true;
- // }
-
- // link_used=(link_to_use + 1) % links.size();
- // while(link_used != link_to_use) {
- // link=(Link)links.elementAt(link_used);
- // if(link.send(buf)) {
- // System.out.println("Send over link #" + link_used + ": " + link);
- // link_to_use=(link_to_use + 1) % links.size();
- // return true;
- // }
- // link_used=(link_used + 1) % links.size();
- // }
-
-
-
-
- // take first available link. use other links only if first is down. if we have smaller and bigger
- // pipes, the bigger ones should be specified first (so we're using them first, and only when they
- // are not available we use the smaller ones)
- for(int i=0; i < links.size(); i++) {
- link=(Link)links.elementAt(i);
- if(link.established()) {
- if(link.send(buf)) {
- System.out.println("Send over link #" + link_used + ": " + link);
- return true;
- }
- }
- }
-
- throw new AllLinksDown();
- }
-
-
- public void setReceiver(Receiver r) {
- receiver=r;
- }
-
-
- /*-------- Interface Link.Receiver ---------*/
-
- /**
- * Receive a message from any of the physical links. That's why this and the next methods have to be
- * synchronized
- */
- public synchronized void receive(byte[] buf) {
- if(receiver != null)
- receiver.receive(buf);
- }
-
- /**
- * One of the physical links went down
- */
- public synchronized void linkDown(InetAddress local, int local_port, InetAddress remote, int remote_port) {
- if(receiver != null)
- receiver.linkDown(local, local_port, remote, remote_port);
- }
-
- /**
- * One of the physical links came up
- */
- public synchronized void linkUp(InetAddress local, int local_port, InetAddress remote, int remote_port) {
- if(receiver != null)
- receiver.linkUp(local, local_port, remote, remote_port);
- }
-
-
- /**
- * Missed one or more heartbeats. Link is not yet down, though
- */
- public synchronized void missedHeartbeat(InetAddress local, int local_port,
- InetAddress remote, int remote_port, int num_missed_hbs) {
- if(receiver != null)
- receiver.missedHeartbeat(local, local_port, remote, remote_port, num_missed_hbs);
- }
-
-
- /**
- * Heartbeat came back again (before link was taken down) after missing some heartbeats
- */
- public synchronized void receivedHeartbeatAgain(InetAddress local, int local_port,
- InetAddress remote, int remote_port) {
- if(receiver != null)
- receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port);
- }
-
-
- protected/*GemStoneAddition*/ static class MyReceiver implements LogicalLink.Receiver {
-
- public void receive(byte[] buf) {
- System.out.println("<-- " + new String(buf));
- }
-
-
- /**
- * All of the physical links are down --> logical link is down too
- */
- public synchronized void linkDown(InetAddress l, int lp, InetAddress r, int rp) {
- System.out.println("** linkDown(): " + r + ':' + rp);
- }
-
- /**
- * At least 1 physical links is up again
- */
- public synchronized void linkUp(InetAddress l, int lp, InetAddress r, int rp) {
- System.out.println("** linkUp(): " + r + ':' + rp);
- }
-
- public synchronized void missedHeartbeat(InetAddress l, int lp, InetAddress r, int rp, int num) {
- //System.out.println("missedHeartbeat(): " + r + ":" + rp);
- }
-
- public synchronized void receivedHeartbeatAgain(InetAddress l, int lp, InetAddress r, int rp) {
- //System.out.println("receivedHeartbeatAgain(): " + r + ":" + rp);
- }
-
- }
-
-
-// public static void main(String[] args) {
-// LogicalLink ll=new LogicalLink();
-// String local_host, remote_host;
-// int local_port, remote_port;
-// int i=0;
-//
-// ll.setReceiver(new MyReceiver());
-//
-// if(args.length % 4 != 0 || args.length == 0) {
-// System.err.println("\nLogicalLink <link+>\nwhere <link> is " +
-// "<local host> <local port> <remote host> <remote port>\n");
-// return;
-// }
-//
-// while(i < args.length) {
-// local_host=args[i++];
-// local_port=Integer.parseInt(args[i++]);
-// remote_host=args[i++];
-// remote_port=Integer.parseInt(args[i++]);
-// ll.addLink(local_host, local_port, remote_host, remote_port);
-// }
-//
-// try {
-// ll.start();
-// }
-// catch(Exception e) {
-// System.err.println("LogicalLink.main(): " + e);
-// }
-//
-// BufferedReader in=new BufferedReader(new InputStreamReader(System.in));
-// while(true) {
-// try {
-// System.out.print("> ");
-// System.out.flush();
-// String line=in.readLine();
-// ll.send(line.getBytes());
-// }
-// catch(Exception e) {
-// System.err.println(e);
-// }
-// }
-//
-// }
-
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MembershipListenerAdapter.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MembershipListenerAdapter.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MembershipListenerAdapter.java
deleted file mode 100644
index 2c4ed69..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MembershipListenerAdapter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.Channel;
-import com.gemstone.org.jgroups.MembershipListener;
-import com.gemstone.org.jgroups.SuspectMember;
-import com.gemstone.org.jgroups.View;
-
-import java.util.HashSet;
-
-/**
- * This class provides multiplexing possibilities for {@link MembershipListener}
- * instances. Usually, we have more than one instance willing to listen to
- * membership messages. {@link PullPushAdapter} allows only one instance of
- * {@link MembershipListener} to be registered for message notification. With
- * help of this class you can overcome this limitation.
- *
- * @author Roman Rokytskyy (rrokytskyy@acm.org)
- */
-
-public class MembershipListenerAdapter implements MembershipListener {
-
- protected final HashSet membershipListeners = new HashSet();
- protected MembershipListener[] membershipListenersCache =
- new MembershipListener[0];
-
- public void channelClosing(Channel c, Exception e) {} // GemStoneAddition
-
-
- /**
- * Notify membership listeners to temporarily stop sending messages into
- * a channel. This method in turn calls same method of all registered
- * membership listener.
- */
- public void block() {
- for(int i = 0; i < membershipListenersCache.length; i++)
- membershipListenersCache[i].block();
- }
-
- /**
- * Notify membership listener that some node was suspected. This method
- * in turn passes suspected member address to all registered membership
- * listeners.
- */
- public void suspect(SuspectMember suspected_mbr) {
- for(int i = 0; i < membershipListenersCache.length; i++)
- membershipListenersCache[i].suspect(suspected_mbr);
- }
-
- /**
- * Notify membership listener that new view was accepted. This method in
- * turn passes new view to all registered membership listeners.
- */
- public void viewAccepted(View new_view) {
- for(int i = 0; i < membershipListenersCache.length; i++)
- membershipListenersCache[i].viewAccepted(new_view);
- }
-
- /**
- * Add membership listener to this adapter. This method registers
- * <code>listener</code> to be notified when membership event is generated.
- *
- * @param listener instance of {@link MembershipListener} that should be
- * added to this adapter.
- */
- public synchronized void addMembershipListener(MembershipListener listener) {
- if (membershipListeners.add(listener))
- membershipListenersCache =
- (MembershipListener[])membershipListeners.toArray(
- new MembershipListener[membershipListeners.size()]);
- }
-
- /**
- * Remove membership listener from this adapter. This method deregisters
- * <code>listener</code> from notification when membership event is generated.
- *
- * @param listener instance of {@link MembershipListener} that should be
- * removed from this adapter.
- */
- public synchronized void removeMembershipListener(MembershipListener listener) {
- if (membershipListeners.remove(listener))
- membershipListenersCache =
- (MembershipListener[])membershipListeners.toArray(
- new MembershipListener[membershipListeners.size()]);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MessageDispatcher.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MessageDispatcher.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MessageDispatcher.java
deleted file mode 100644
index 9375b72..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MessageDispatcher.java
+++ /dev/null
@@ -1,845 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: MessageDispatcher.java,v 1.44 2005/11/12 06:39:02 belaban Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.*;
-
-import java.io.Serializable;
-import java.util.Vector;
-import java.util.Collection;
-import java.util.TreeSet;
-
-
-/**
- * Provides synchronous and asynchronous message sending with request-response
- * correlation; i.e., matching responses with the original request.
- * It also offers push-style message reception (by internally using the PullPushAdapter).
- * <p>
- * Channels are simple patterns to asynchronously send a receive messages.
- * However, a significant number of communication patterns in group communication
- * require synchronous communication. For example, a sender would like to send a
- * message to the group and wait for all responses. Or another application would
- * like to send a message to the group and wait only until the majority of the
- * receivers have sent a response, or until a timeout occurred. MessageDispatcher
- * offers a combination of the above pattern with other patterns.
- * <p>
- * Used on top of channel to implement group requests. Client's <code>handle()</code>
- * method is called when request is received. Is the equivalent of RpcProtocol on
- * the application instead of protocol level.
- *
- * @author Bela Ban
- */
-public class MessageDispatcher implements RequestHandler {
- protected Channel channel=null;
- protected RequestCorrelator corr=null;
- protected MessageListener msg_listener=null;
- protected MembershipListener membership_listener=null;
- protected RequestHandler req_handler=null;
- protected ProtocolAdapter prot_adapter=null;
- protected TransportAdapter transport_adapter=null;
- protected final Collection members=new TreeSet();
- protected Address local_addr=null;
- protected boolean deadlock_detection=false;
- protected PullPushAdapter adapter=null;
- protected Serializable id=null;
- protected final GemFireTracer log = GemFireTracer.getLog(getClass());
-
-
- /**
- * Process items on the queue concurrently (RequestCorrelator). The default is to wait until the processing of an
- * item has completed before fetching the next item from the queue. Note that setting this to true may destroy the
- * properties of a protocol stack, e.g total or causal order may not be guaranteed. Set this to true only if you
- * know what you're doing !
- */
- protected boolean concurrent_processing=false;
-
-
- public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2) {
- this.channel=channel;
- prot_adapter=new ProtocolAdapter();
- if(channel != null) {
- local_addr=channel.getLocalAddress();
- channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
- }
- setMessageListener(l);
- setMembershipListener(l2);
- if(channel != null) {
- channel.setUpHandler(prot_adapter);
- }
- start();
- }
-
-
- public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, boolean deadlock_detection) {
- this.channel=channel;
- this.deadlock_detection=deadlock_detection;
- prot_adapter=new ProtocolAdapter();
- if(channel != null) {
- local_addr=channel.getLocalAddress();
- channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
- }
- setMessageListener(l);
- setMembershipListener(l2);
- if(channel != null) {
- channel.setUpHandler(prot_adapter);
- }
- start();
- }
-
- public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2,
- boolean deadlock_detection, boolean concurrent_processing) {
- this.channel=channel;
- this.deadlock_detection=deadlock_detection;
- this.concurrent_processing=concurrent_processing;
- prot_adapter=new ProtocolAdapter();
- if(channel != null) {
- local_addr=channel.getLocalAddress();
- channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
- }
- setMessageListener(l);
- setMembershipListener(l2);
- if(channel != null) {
- channel.setUpHandler(prot_adapter);
- }
- start();
- }
-
-
- public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler) {
- this(channel, l, l2);
- setRequestHandler(req_handler);
- }
-
-
- public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler,
- boolean deadlock_detection) {
- this(channel, l, l2);
- this.deadlock_detection=deadlock_detection;
- setRequestHandler(req_handler);
- }
-
- public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler,
- boolean deadlock_detection, boolean concurrent_processing) {
- this(channel, l, l2);
- this.deadlock_detection=deadlock_detection;
- this.concurrent_processing=concurrent_processing;
- setRequestHandler(req_handler);
- }
-
-
- /*
- * Uses a user-provided PullPushAdapter rather than a Channel as transport. If id is non-null, it will be
- * used to register under that id. This is typically used when another building block is already using
- * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
- * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
- * first block created on PullPushAdapter.
- * @param adapter The PullPushAdapter which to use as underlying transport
- * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
- * requests/responses for different building blocks on top of PullPushAdapter.
- */
- public MessageDispatcher(PullPushAdapter adapter, Serializable id,
- MessageListener l, MembershipListener l2) {
- this.adapter=adapter;
- this.id=id;
- setMembers(((Channel) adapter.getTransport()).getView().getMembers());
- setMessageListener(l);
- setMembershipListener(l2);
- PullPushHandler handler=new PullPushHandler();
- Transport tp;
-
- transport_adapter=new TransportAdapter();
- adapter.addMembershipListener(handler);
- if(id == null) // no other building block around, let's become the main consumer of this PullPushAdapter
- {
- adapter.setListener(handler);
- }
- else {
- adapter.registerListener(id, handler);
- }
-
- if((tp=adapter.getTransport()) instanceof Channel) {
- ((Channel) tp).setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
- local_addr=((Channel) tp).getLocalAddress();
- }
- start();
- }
-
-
- /*
- * Uses a user-provided PullPushAdapter rather than a Channel as transport. If id is non-null, it will be
- * used to register under that id. This is typically used when another building block is already using
- * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
- * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
- * first block created on PullPushAdapter.
- * @param adapter The PullPushAdapter which to use as underlying transport
- * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
- * requests/responses for different building blocks on top of PullPushAdapter.
- * @param req_handler The object implementing RequestHandler. It will be called when a request is received
- */
- public MessageDispatcher(PullPushAdapter adapter, Serializable id,
- MessageListener l, MembershipListener l2,
- RequestHandler req_handler) {
- this.adapter=adapter;
- this.id=id;
- setMembers(((Channel) adapter.getTransport()).getView().getMembers());
- setRequestHandler(req_handler);
- setMessageListener(l);
- setMembershipListener(l2);
- PullPushHandler handler=new PullPushHandler();
- Transport tp;
-
- transport_adapter=new TransportAdapter();
- adapter.addMembershipListener(handler);
- if(id == null) // no other building block around, let's become the main consumer of this PullPushAdapter
- {
- adapter.setListener(handler);
- }
- else {
- adapter.registerListener(id, handler);
- }
-
- if((tp=adapter.getTransport()) instanceof Channel) {
- ((Channel) tp).setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
- local_addr=((Channel) tp).getLocalAddress(); // fixed bug #800774
- }
-
- start();
- }
-
-
- public MessageDispatcher(PullPushAdapter adapter, Serializable id,
- MessageListener l, MembershipListener l2,
- RequestHandler req_handler, boolean concurrent_processing) {
- this.concurrent_processing=concurrent_processing;
- this.adapter=adapter;
- this.id=id;
- setMembers(((Channel) adapter.getTransport()).getView().getMembers());
- setRequestHandler(req_handler);
- setMessageListener(l);
- setMembershipListener(l2);
- PullPushHandler handler=new PullPushHandler();
- Transport tp;
-
- transport_adapter=new TransportAdapter();
- adapter.addMembershipListener(handler);
- if(id == null) // no other building block around, let's become the main consumer of this PullPushAdapter
- {
- adapter.setListener(handler);
- }
- else {
- adapter.registerListener(id, handler);
- }
-
- if((tp=adapter.getTransport()) instanceof Channel) {
- ((Channel) tp).setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
- local_addr=((Channel) tp).getLocalAddress(); // fixed bug #800774
- }
-
- start();
- }
-
-
- /**
- * If this dispatcher is using a user-provided PullPushAdapter, then need to set the members from the adapter
- * initially since viewChange has most likely already been called in PullPushAdapter.
- */
- protected/*GemStoneAddition*/ void setMembers(Vector new_mbrs) {
- if(new_mbrs != null) {
- synchronized(members) {
- members.clear();
- members.addAll(new_mbrs);
- }
- }
- }
-
- public void setDeadlockDetection(boolean flag) {
- deadlock_detection=flag;
- if(corr != null)
- corr.setDeadlockDetection(flag);
- }
-
- public void setConcurrentProcessing(boolean flag) {
- this.concurrent_processing=flag;
- }
-
-
- public void start() {
- if(corr == null) {
- if(transport_adapter != null) {
- corr=new RequestCorrelator("MessageDispatcher", transport_adapter,
- this, deadlock_detection, local_addr, concurrent_processing);
- }
- else {
- corr=new RequestCorrelator("MessageDispatcher", prot_adapter,
- this, deadlock_detection, local_addr, concurrent_processing);
- }
- }
- corr.start();
- if(channel != null) {
- Vector tmp_mbrs=channel.getView() != null ? channel.getView().getMembers() : null;
- setMembers(tmp_mbrs);
- }
- }
-
-
- public void stop() {
- if(corr != null) {
- corr.stop();
- }
- }
-
-
- public void setMessageListener(MessageListener l) {
- msg_listener=l;
- }
-
- /**
- * Gives access to the currently configured MessageListener. Returns null if there is no
- * configured MessageListener.
- */
- public MessageListener getMessageListener() {
- return msg_listener;
- }
-
- public void setMembershipListener(MembershipListener l) {
- membership_listener=l;
- }
-
- public void setRequestHandler(RequestHandler rh) {
- req_handler=rh;
- }
-
- /**
- * Offers access to the underlying Channel.
- * @return a reference to the underlying Channel.
- */
- public Channel getChannel() {
- return channel;
- }
-
-
- public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException {
- if(channel != null) {
- channel.send(msg);
- }
- else
- if(adapter != null) {
- try {
- if(id != null) {
- adapter.send(id, msg);
- }
- else {
- adapter.send(msg);
- }
- }
- catch(Throwable ex) {
- if(log.isErrorEnabled()) {
- log.error(ExternalStrings.MessageDispatcher_EXCEPTION_0, Util.print(ex));
- }
- }
- }
- else {
- if(log.isErrorEnabled()) {
- log.error(ExternalStrings.MessageDispatcher_CHANNEL__NULL);
- }
- }
- }
-
-
- /**
- * Cast a message to all members, and wait for <code>mode</code> responses. The responses are returned in a response
- * list, where each response is associated with its sender.<p> Uses <code>GroupRequest</code>.
- *
- * @param dests The members to which the message is to be sent. If it is null, then the message is sent to all
- * members
- * @param msg The message to be sent to n members
- * @param mode Defined in <code>GroupRequest</code>. The number of responses to wait for: <ol> <li>GET_FIRST:
- * return the first response received. <li>GET_ALL: wait for all responses (minus the ones from
- * suspected members) <li>GET_MAJORITY: wait for a majority of all responses (relative to the grp
- * size) <li>GET_ABS_MAJORITY: wait for majority (absolute, computed once) <li>GET_N: wait for n
- * responses (may block if n > group size) <li>GET_NONE: wait for no responses, return immediately
- * (non-blocking) </ol>
- * @param timeout If 0: wait forever. Otherwise, wait for <code>mode</code> responses <em>or</em> timeout time.
- * @return RspList A list of responses. Each response is an <code>Object</code> and associated to its sender.
- */
- public RspList castMessage(final Vector dests, Message msg, int mode, long timeout) {
- GroupRequest _req=null;
- Vector real_dests;
- Channel tmp;
-
- // we need to clone because we don't want to modify the original
- // (we remove ourselves if LOCAL is false, see below) !
- // real_dests=dests != null ? (Vector) dests.clone() : (members != null ? new Vector(members) : null);
- if(dests != null) {
- real_dests=(Vector)dests.clone();
- }
- else {
- synchronized(members) {
- real_dests=new Vector(members);
- }
- }
-
- // if local delivery is off, then we should not wait for the message from the local member.
- // therefore remove it from the membership
- tmp=channel;
- if(tmp == null) {
- if(adapter != null && adapter.getTransport() instanceof Channel) {
- tmp=(Channel) adapter.getTransport();
- }
- }
-
- if(tmp != null && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) {
- if(local_addr == null) {
- local_addr=tmp.getLocalAddress();
- }
- if(local_addr != null && real_dests != null) {
- real_dests.removeElement(local_addr);
- }
- }
-
- // don't even send the message if the destination list is empty
- if(log.isTraceEnabled())
- log.trace("real_dests=" + real_dests);
-
- if(real_dests == null || real_dests.size() == 0) {
- if(log.isTraceEnabled())
- log.trace("destination list is empty, won't send message");
- return new RspList(); // return empty response list
- }
-
- _req=new GroupRequest(msg, corr, real_dests, mode, timeout, 0);
- _req.setCaller(this.local_addr);
- _req.execute();
-
- return _req.getResults();
- }
-
-
- /**
- * Multicast a message request to all members in <code>dests</code> and receive responses via the RspCollector
- * interface. When done receiving the required number of responses, the caller has to call done(req_id) on the
- * underlyinh RequestCorrelator, so that the resources allocated to that request can be freed.
- *
- * @param dests The list of members from which to receive responses. Null means all members
- * @param req_id The ID of the request. Used by the underlying RequestCorrelator to correlate responses with
- * requests
- * @param msg The request to be sent
- * @param coll The sender needs to provide this interface to collect responses. Call will return immediately if
- * this is null
- */
- public void castMessage(final Vector dests, long req_id, Message msg, RspCollector coll) {
- Vector real_dests;
- Channel tmp;
-
- if(msg == null) {
- if(log.isErrorEnabled())
- log.error(ExternalStrings.MessageDispatcher_REQUEST_IS_NULL);
- return;
- }
-
- if(coll == null) {
- if(log.isErrorEnabled())
- log.error(ExternalStrings.MessageDispatcher_RESPONSE_COLLECTOR_IS_NULL_MUST_BE_NONNULL);
- return;
- }
-
- // we need to clone because we don't want to modify the original
- // (we remove ourselves if LOCAL is false, see below) !
- //real_dests=dests != null ? (Vector) dests.clone() : (Vector) members.clone();
- if(dests != null) {
- real_dests=(Vector)dests.clone();
- }
- else {
- synchronized(members) {
- real_dests=new Vector(members);
- }
- }
-
- // if local delivery is off, then we should not wait for the message from the local member.
- // therefore remove it from the membership
- tmp=channel;
- if(tmp == null) {
- if(adapter != null && adapter.getTransport() instanceof Channel) {
- tmp=(Channel) adapter.getTransport();
- }
- }
-
- if(tmp != null && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) {
- if(local_addr == null) {
- local_addr=tmp.getLocalAddress();
- }
- if(local_addr != null) {
- real_dests.removeElement(local_addr);
- }
- }
-
- // don't even send the message if the destination list is empty
- if(real_dests.size() == 0) {
- if(log.isDebugEnabled())
- log.debug("destination list is empty, won't send message");
- return;
- }
-
- corr.sendRequest(req_id, real_dests, msg, coll);
- }
-
-
- public void done(long req_id) {
- corr.done(req_id);
- }
-
-
- /**
- * Sends a message to a single member (destination = msg.dest) and returns the response. The message's destination
- * must be non-zero !
- */
- public Object sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException {
- Vector mbrs=new Vector();
- RspList rsp_list=null;
- Object dest=msg.getDest();
- Rsp rsp;
- GroupRequest _req=null;
-
- if(dest == null) {
- if(log.isErrorEnabled())
- log.error("the message's destination is null, " +
- "cannot send message");
- return null;
- }
-
- mbrs.addElement(dest); // dummy membership (of destination address)
-
- _req=new GroupRequest(msg, corr, mbrs, mode, timeout, 0);
- _req.setCaller(local_addr);
- _req.execute();
-
- if(mode == GroupRequest.GET_NONE) {
- return null;
- }
-
- rsp_list=_req.getResults();
-
- if(rsp_list.size() == 0) {
- if(log.isWarnEnabled())
- log.warn(" response list is empty");
- return null;
- }
- if(rsp_list.size() > 1) {
- if(log.isWarnEnabled())
- log.warn("response list contains more that 1 response; returning first response !");
- }
- rsp=(Rsp) rsp_list.elementAt(0);
- if(rsp.wasSuspected()) {
- throw new SuspectedException(dest);
- }
- if(!rsp.wasReceived()) {
- throw new TimeoutException();
- }
- return rsp.getValue();
- }
-
-
-// public void channelConnected(Channel channel) {
-// if(channel != null) {
-// Address new_local_addr=channel.getLocalAddress();
-// if(new_local_addr != null) {
-// this.local_addr=new_local_addr;
-//
-// if(log.isInfoEnabled()) log.info("MessageDispatcher.channelConnected()", "new local address is " + this.local_addr);
-// }
-// }
-// }
-//
-// public void channelDisconnected(Channel channel) {
-// }
-//
-// public void channelClosed(Channel channel) {
-// }
-//
-// public void channelShunned() {
-// }
-//
-// public void channelReconnected(Address addr) {
-// if(channel != null) {
-// Address new_local_addr=channel.getLocalAddress();
-// if(new_local_addr != null) {
-// this.local_addr=new_local_addr;
-//
-// if(log.isInfoEnabled()) log.info("MessageDispatcher.channelReconnected()", "new local address is " + this.local_addr);
-// }
-// }
-// }
-
-
- /* ------------------------ RequestHandler Interface ---------------------- */
- public Object handle(Message msg) {
- if(req_handler != null) {
- return req_handler.handle(msg);
- }
- else {
- return null;
- }
- }
- /* -------------------- End of RequestHandler Interface ------------------- */
-
-
-
-
-
-
- class ProtocolAdapter extends Protocol implements UpHandler {
-
-
- /* ------------------------- Protocol Interface --------------------------- */
-
- @Override // GemStoneAddition
- public String getName() {
- return "MessageDispatcher";
- }
-
- @Override // GemStoneAddition
- public void startUpHandler() {
- // do nothing, DON'T REMOVE !!!!
- }
-
- @Override // GemStoneAddition
- public void startDownHandler() {
- // do nothing, DON'T REMOVE !!!!
- }
-
-
- @Override // GemStoneAddition
- public void stopInternal() {
- // do nothing, DON'T REMOVE !!!!
- }
-
- @Override // GemStoneAddition
- protected void receiveUpEvent(Event evt) {
- }
-
- @Override // GemStoneAddition
- protected void receiveDownEvent(Event evt) {
- }
-
- /**
- * Called by request correlator when message was not generated by it. We handle it and call the message
- * listener's corresponding methods
- */
- @Override // GemStoneAddition
- public void passUp(Event evt) {
- byte[] tmp_state=null;
- switch(evt.getType()) {
- case Event.MSG:
- if(msg_listener != null) {
- msg_listener.receive((Message) evt.getArg());
- }
- break;
-
- case Event.GET_APPLSTATE: // reply with GET_APPLSTATE_OK
- if(msg_listener != null) {
- try {
- tmp_state=msg_listener.getState();
- }
- catch(Throwable t) {
- MessageDispatcher.this.log.error(ExternalStrings.MessageDispatcher_FAILED_GETTING_STATE_FROM_MESSAGE_LISTENER__0, msg_listener, t);
- }
- }
- channel.returnState(tmp_state);
- break;
-
- case Event.GET_STATE_OK:
- if(msg_listener != null) {
- try {
- msg_listener.setState((byte[]) evt.getArg());
- }
- catch(ClassCastException cast_ex) {
- if(this.log.isErrorEnabled())
- this.log.error("received SetStateEvent, but argument " +
- evt.getArg() + " is not serializable. Discarding message.");
- }
- }
- break;
-
- case Event.VIEW_CHANGE:
- View v=(View) evt.getArg();
- Vector new_mbrs=v.getMembers();
- setMembers(new_mbrs);
- if(membership_listener != null) {
- membership_listener.viewAccepted(v);
- }
- break;
-
- case Event.SET_LOCAL_ADDRESS:
- if(log.isTraceEnabled())
- log.trace("setting local_addr (" + local_addr + ") to " + evt.getArg());
- local_addr=(Address)evt.getArg();
- break;
-
- case Event.SUSPECT:
- if(membership_listener != null) {
- membership_listener.suspect((SuspectMember)evt.getArg()); // GemStoneAddition SuspectMember struct
- }
- break;
-
- case Event.BLOCK:
- if(membership_listener != null) {
- membership_listener.block();
- }
- break;
- }
- }
-
-
- @Override // GemStoneAddition
- public void passDown(Event evt) {
- down(evt);
- }
-
-
-
- /**
- * Called by channel (we registered before) when event is received. This is the UpHandler interface.
- */
- @Override // GemStoneAddition
- public void up(Event evt) {
- if(corr != null) {
- corr.receive(evt); // calls passUp()
- }
- else {
- if(log.isErrorEnabled()) { //Something is seriously wrong, correlator should not be null since latch is not locked!
- log.error(ExternalStrings.MessageDispatcher_CORRELATOR_IS_NULL_EVENT_WILL_BE_IGNORED_EVT_0_, evt);
- }
- }
- }
-
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
- if(channel != null) {
- channel.down(evt);
- }
- else
- if(this.log.isWarnEnabled()) {
- this.log.warn("channel is null, discarding event " + evt);
- }
- }
- /* ----------------------- End of Protocol Interface ------------------------ */
-
- }
-
-
- class TransportAdapter implements Transport {
-
- public void send(Message msg) throws Exception {
- if(channel != null) {
- channel.send(msg);
- }
- else
- if(adapter != null) {
- try {
- if(id != null) {
- adapter.send(id, msg);
- }
- else {
- adapter.send(msg);
- }
- }
- catch(Throwable ex) {
- if(log.isErrorEnabled()) {
- log.error(ExternalStrings.MessageDispatcher_EXCEPTION_0, Util.print(ex));
- }
- }
- }
- else {
- if(log.isErrorEnabled()) {
- log.error(ExternalStrings.MessageDispatcher_CHANNEL__NULL);
- }
- }
- }
-
- public Object receive(long timeout) throws Exception {
- // @todo: implement
- return null;
- }
- }
-
-
- class PullPushHandler implements MessageListener, MembershipListener {
-
-
- /* ------------------------- MessageListener interface ---------------------- */
- public void receive(Message msg) {
- boolean pass_up=true;
- if(corr != null) {
- pass_up=corr.receiveMessage(msg);
- }
-
- if(pass_up) { // pass on to MessageListener
- if(msg_listener != null) {
- msg_listener.receive(msg);
- }
- }
- }
-
- public byte[] getState() {
- return msg_listener != null ? msg_listener.getState() : null;
- }
-
- public void setState(byte[] state) {
- if(msg_listener != null) {
- msg_listener.setState(state);
- }
- }
- /* --------------------- End of MessageListener interface ------------------- */
-
-
- /* ------------------------ MembershipListener interface -------------------- */
- public void viewAccepted(View v) {
- if(corr != null) {
- corr.receiveView(v);
- }
-
- Vector new_mbrs=v.getMembers();
- setMembers(new_mbrs);
- if(membership_listener != null) {
- membership_listener.viewAccepted(v);
- }
- }
-
- public void suspect(SuspectMember suspected_mbr) { // GemStoneAddition SuspectMember struct
- if(corr != null) {
- corr.receiveSuspect(suspected_mbr.suspectedMember);
- }
- if(membership_listener != null) {
- membership_listener.suspect(suspected_mbr);
- }
- }
-
- public void block() {
- if(membership_listener != null) {
- membership_listener.block();
- }
- }
-
- public void channelClosing(Channel c, Exception e) {} // GemStoneAddition
-
-
- /* --------------------- End of MembershipListener interface ---------------- */
-
-
-
- // @todo: receive SET_LOCAL_ADDR event and call corr.setLocalAddress(addr)
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MessageListenerAdapter.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MessageListenerAdapter.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MessageListenerAdapter.java
deleted file mode 100644
index 1b11871..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MessageListenerAdapter.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.MessageListener;
-
-import java.util.HashSet;
-
-/**
- * This class provides multiplexing possibilities for {@link MessageListener}
- * instances. Usually, we have more than one instance willing to listen to
- * incoming messages, but only one that can produce state for group.
- * {@link PullPushAdapter} allows only one instance of {@link MessageListener}
- * to be registered for message notification. With help of this class you
- * can overcome this limitation.
- *
- * @author Roman Rokytskyy (rrokytskyy@acm.org)
- */
-public class MessageListenerAdapter implements MessageListener {
-
- protected MessageListener stateListener;
-
- protected final HashSet messageListeners = new HashSet();
-
- // we need this cache, because every call to messageListeners.iterator()
- // would generate few new objects, but iteration over the cache would not.
- protected MessageListener[] messageListenersCache = new MessageListener[0];
-
- /**
- * Create default instance of this class. Newly created instance will have
- * no message or state listeners. You have to use
- * {@link #addMessageListener(MessageListener)} or
- * {@link #removeMessageListener(MessageListener)} to add or remove message
- * listeners, and {@link #setStateListener(MessageListener)} to set listener
- * that will participate in state transfer.
- */
- public MessageListenerAdapter() {
- this(null);
- }
-
- /**
- * Create instance of this class. <code>mainListener</code> is a main
- * listener instance that received message notifications and can get and
- * set group state.
- *
- * @param mainListener instance of {@link MessageListener} that will
- * provide state messages.
- */
- public MessageListenerAdapter(MessageListener mainListener) {
- if (mainListener != null) {
- stateListener = mainListener;
- addMessageListener(mainListener);
- }
- }
-
- /**
- * Get state from state listener if present.
- *
- * @return current state of the group state or <code>null</code> if no state
- * listeners were registered.
- */
- public byte[] getState() {
- if (stateListener != null)
- return stateListener.getState();
- else
- return null;
- }
-
- /**
- * Receive message from group. This method will send this message to each
- * message listener that was registered in this adapter.
- *
- * @param msg message to distribute within message listeners.
- */
- public void receive(Message msg) {
- for (int i = 0; i < messageListenersCache.length; i++)
- messageListenersCache[i].receive(msg);
- }
-
- /**
- * Set state of ths group. This method will delegate call to state listener
- * if it was previously registered.
- */
- public void setState(byte[] state) {
- if (stateListener != null)
- stateListener.setState(state);
- }
-
- /**
- * Add message listener to this adapter. This method registers
- * <code>listener</code> for message notification.
- * <p>
- * Note, state notification will not be used.
- */
- public synchronized void addMessageListener(MessageListener listener) {
- if (messageListeners.add(listener))
- messageListenersCache =
- (MessageListener[])messageListeners.toArray(
- new MessageListener[messageListeners.size()]);
- }
-
- /**
- * Remove message listener from this adapter. This method deregisters
- * <code>listener</code> from message notification.
- */
- public synchronized void removeMessageListener(MessageListener listener) {
- if (messageListeners.remove(listener))
- messageListenersCache =
- (MessageListener[])messageListeners.toArray(
- new MessageListener[messageListeners.size()]);
- }
-
- /**
- * Register <code>listener</code> for state notification events. There can
- * be only one state listener per adapter.
- */
- public void setStateListener(MessageListener listener) {
- stateListener = listener;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MethodCall.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MethodCall.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MethodCall.java
deleted file mode 100644
index f858dda..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MethodCall.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-
-
-/**
- * A method call is the JGroups representation of a remote method.
- * It includes the name of the method (case sensitive) and a list of arguments.
- * A method call is serializable and can be passed over the wire.
- * @author Bela Ban
- * @version $Revision: 1.19 $
- */
-public class MethodCall implements Externalizable {
-
- static final long serialVersionUID=7873471327078957662L;
-
- /** The name of the method, case sensitive. */
- protected String method_name=null;
-
- /** The ID of a method, maps to a java.lang.reflect.Method */
- protected short method_id=-1;
-
- /** The arguments of the method. */
- protected Object[] args=null;
-
- /** The class types, e.g., new Class[]{String.class, int.class}. */
- protected Class[] types=null;
-
- /** The signature, e.g., new String[]{String.class.getName(), int.class.getName()}. */
- protected String[] signature=null;
-
- /** The Method of the call. */
- protected Method method=null;
-
- /** To carry arbitrary data with a method call, data needs to be serializable if sent across the wire */
- protected Map payload=null;
-
- protected static final GemFireTracer log=GemFireTracer.getLog(MethodCall.class);
-
- /** Which mode to use. */
- protected short mode=OLD;
-
- /** Infer the method from the arguments. */
- protected static final short OLD=1;
-
- /** Explicitly ship the method, caller has to determine method himself. */
- protected static final short METHOD=2;
-
- /** Use class information. */
- protected static final short TYPES=3;
-
- /** Provide a signature, similar to JMX. */
- protected static final short SIGNATURE=4;
-
- /** Use an ID to map to a method */
- protected static final short ID=5;
-
-
- /**
- * Creates an empty method call, this is always invalid, until
- * <code>setName()</code> has been called.
- */
- public MethodCall() {
- }
-
-
- public MethodCall(Method method) {
- this(method, null);
- }
-
- public MethodCall(Method method, Object[] arguments) {
- init(method);
- if(arguments != null) args=arguments;
- }
-
- /**
- *
- * @param method_name
- * @param args
- * @deprecated Use one of the constructors that take class types as arguments
- */
- @Deprecated // GemStoneAddition
- public MethodCall(String method_name, Object[] args) {
- this.method_name=method_name;
- this.mode=OLD;
- this.args=args;
- }
-
- public MethodCall(short method_id, Object[] args) {
- this.method_id=method_id;
- this.mode=ID;
- this.args=args;
- }
-
-
- public MethodCall(String method_name, Object[] args, Class[] types) {
- this.method_name=method_name;
- this.args=args;
- this.types=types;
- this.mode=TYPES;
- }
-
- public MethodCall(String method_name, Object[] args, String[] signature) {
- this.method_name=method_name;
- this.args=args;
- this.signature=signature;
- this.mode=SIGNATURE;
- }
-
- private void init(Method method) {
- this.method=method;
- this.mode=METHOD;
- method_name=method.getName();
- }
-
-
- public int getMode() {
- return mode;
- }
-
-
-
- /**
- * returns the name of the method to be invoked using this method call object
- * @return a case sensitive name, can be null for an invalid method call
- */
- public String getName() {
- return method_name;
- }
-
- /**
- * sets the name for this MethodCall and allowing you to reuse the same object for
- * a different method invokation of a different method
- * @param n - a case sensitive method name
- */
- public void setName(String n) {
- method_name=n;
- }
-
- public short getId() {
- return method_id;
- }
-
- public void setId(short method_id) {
- this.method_id=method_id;
- }
-
- /**
- * returns an ordered list of arguments used for the method invokation
- * @return returns the list of ordered arguments
- */
- public Object[] getArgs() {
- return args;
- }
-
- public void setArgs(Object[] args) {
- if(args != null)
- this.args=args;
- }
-
- public Method getMethod() {
- return method;
- }
-
-
- public void setMethod(Method m) {
- init(m);
- }
-
-
- public synchronized Object put(Object key, Object value) {
- if(payload == null)
- payload=new HashMap();
- return payload.put(key, value);
- }
-
- public synchronized Object get(Object key) {
- return payload != null? payload.get(key) : null;
- }
-
-
- /**
- *
- * @param target_class
- * @return the Method
- * @throws Exception
- */
- Method findMethod(Class target_class) throws Exception {
- int len=args != null? args.length : 0;
- Method m;
-
- Method[] methods=getAllMethods(target_class);
- for(int i=0; i < methods.length; i++) {
- m=methods[i];
- if(m.getName().equals(method_name)) {
- if(m.getParameterTypes().length == len)
- return m;
- }
- }
-
- return null;
- }
-
-
- /**
- * The method walks up the class hierarchy and returns <i>all</i> methods of this class
- * and those inherited from superclasses and superinterfaces.
- */
- Method[] getAllMethods(Class target) {
-
- Class superclass = target;
- List methods = new ArrayList();
- int size = 0;
-
- while(superclass != null) {
- Method[] m = superclass.getDeclaredMethods();
- methods.add(m);
- size += m.length;
- superclass = superclass.getSuperclass();
- }
-
- Method[] result = new Method[size];
- int index = 0;
- for(Iterator i = methods.iterator(); i.hasNext();) {
- Method[] m = (Method[])i.next();
- System.arraycopy(m, 0, result, index, m.length);
- index += m.length;
- }
- return result;
- }
-
- /**
- * Returns the first method that matches the specified name and parameter types. The overriding
- * methods have priority. The method is chosen from all the methods of the current class and all
- * its superclasses and superinterfaces.
- *
- * @return the matching method or null if no mathching method has been found.
- */
- Method getMethod(Class target, String methodName, Class[] types) {
-
- if (types == null) {
- types = new Class[0];
- }
-
- Method[] methods = getAllMethods(target);
- methods: for(int i = 0; i < methods.length; i++) {
- Method m = methods[i];
- if (!methodName.equals(m.getName())) {
- continue;
- }
- Class[] parameters = m.getParameterTypes();
- if (types.length != parameters.length) {
- continue;
- }
- for(int j = 0; j < types.length; j++) {
- if (!types[j].equals(parameters[j])) {
- continue methods;
- }
- }
- return m;
- }
- return null;
- }
-
-
- /**
- * Invokes the method with the supplied arguments against the target object.
- * If a method lookup is provided, it will be used. Otherwise, the default
- * method lookup will be used.
- * @param target - the object that you want to invoke the method on
- * @return an object
- */
- public Object invoke(Object target) throws Throwable {
- Class cl;
- Method meth=null;
- Object retval=null;
-
-
- if(method_name == null || target == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.MethodCall_METHOD_NAME_OR_TARGET_IS_NULL);
- return null;
- }
- cl=target.getClass();
- try {
- switch(mode) {
- case OLD:
- meth=findMethod(cl);
- break;
- case METHOD:
- if(this.method != null)
- meth=this.method;
- break;
- case TYPES:
- //meth=cl.getDeclaredMethod(method_name, types);
- meth = getMethod(cl, method_name, types);
- break;
- case SIGNATURE:
- Class[] mytypes=null;
- if(signature != null)
- mytypes=getTypesFromString(cl, signature);
- //meth=cl.getDeclaredMethod(method_name, mytypes);
- meth = getMethod(cl, method_name, mytypes);
- break;
- case ID:
- break;
- default:
- if(log.isErrorEnabled()) log.error(ExternalStrings.MethodCall_MODE__0__IS_INVALID, mode);
- break;
- }
-
- if(meth != null) {
- retval=meth.invoke(target, args);
- }
- else {
- if(log.isErrorEnabled()) log.error(ExternalStrings.MethodCall_METHOD__0__NOT_FOUND, method_name);
- }
- return retval;
- }
- catch(InvocationTargetException inv_ex) {
- throw inv_ex.getTargetException();
- }
- catch(NoSuchMethodException no) {
- StringBuffer sb=new StringBuffer();
- sb.append("found no method called ").append(method_name).append(" in class ");
- sb.append(cl.getName()).append(" with (");
- if(args != null) {
- for(int i=0; i < args.length; i++) {
- if(i > 0)
- sb.append(", ");
- sb.append((args[i] != null)? args[i].getClass().getName() : "null");
- }
- }
- sb.append(") formal parameters");
- log.error(sb.toString());
- throw no;
- }
- catch(Throwable e) {
- // e.printStackTrace(System.err);
- if(log.isErrorEnabled()) log.error(ExternalStrings.MethodCall_EXCEPTION_IN_INVOKE, e);
- throw e;
- }
- }
-
- public Object invoke(Object target, Object[] args) throws Throwable {
- if(args != null)
- this.args=args;
- return invoke(target);
- }
-
-
- Class[] getTypesFromString(Class cl, String[] signature) throws Exception {
- String name;
- Class parameter;
- Class[] mytypes=new Class[signature.length];
-
- for(int i=0; i < signature.length; i++) {
- name=signature[i];
- if("long".equals(name))
- parameter=long.class;
- else if("int".equals(name))
- parameter=int.class;
- else if("short".equals(name))
- parameter=short.class;
- else if("char".equals(name))
- parameter=char.class;
- else if("byte".equals(name))
- parameter=byte.class;
- else if("float".equals(name))
- parameter=float.class;
- else if("double".equals(name))
- parameter=double.class;
- else if("boolean".equals(name))
- parameter=boolean.class;
- else
- parameter=Class.forName(name);
- mytypes[i]=parameter;
- }
- return mytypes;
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer ret=new StringBuffer();
- boolean first=true;
- if(method_name != null)
- ret.append(method_name);
- else
- ret.append(method_id);
- ret.append('(');
- if(args != null) {
- for(int i=0; i < args.length; i++) {
- if(first)
- first=false;
- else
- ret.append(", ");
- ret.append(args[i]);
- }
- }
- ret.append(')');
- return ret.toString();
- }
-
- public String toStringDetails() {
- StringBuffer ret=new StringBuffer();
- ret.append("MethodCall ");
- if(method_name != null)
- ret.append("name=").append(method_name);
- else
- ret.append("id=").append(method_id);
- ret.append(", number of args=").append((args != null? args.length : 0)).append(')');
- if(args != null) {
- ret.append("\nArgs:");
- for(int i=0; i < args.length; i++) {
- ret.append("\n[").append(args[i]).append(" (").
- append((args[i] != null? args[i].getClass().getName() : "null")).append(")]");
- }
- }
- return ret.toString();
- }
-
-
- public void writeExternal(ObjectOutput out) throws IOException {
- if(method_name != null) {
- out.writeBoolean(true);
- out.writeUTF(method_name);
- }
- else {
- out.writeBoolean(false);
- out.writeShort(method_id);
- }
- out.writeObject(args);
- out.writeShort(mode);
-
- switch(mode) {
- case OLD:
- break;
- case METHOD:
- out.writeObject(method.getParameterTypes());
- out.writeObject(method.getDeclaringClass());
- break;
- case TYPES:
- out.writeObject(types);
- break;
- case SIGNATURE:
- out.writeObject(signature);
- break;
- case ID:
- break;
- default:
- if(log.isErrorEnabled()) log.error(ExternalStrings.MethodCall_MODE__0__IS_INVALID, mode);
- break;
- }
-
- if(payload != null) {
- out.writeBoolean(true);
- out.writeObject(payload);
- }
- else {
- out.writeBoolean(false);
- }
- }
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- boolean name_available=in.readBoolean();
- if(name_available)
- method_name=in.readUTF();
- else
- method_id=in.readShort();
- args=(Object[])in.readObject();
- mode=in.readShort();
-
- switch(mode) {
- case OLD:
- break;
- case METHOD:
- Class[] parametertypes=(Class[])in.readObject();
- Class declaringclass=(Class)in.readObject();
- try {
- method=declaringclass.getDeclaredMethod(method_name, parametertypes);
- }
- catch(NoSuchMethodException e) {
- throw new IOException(e.toString());
- }
- break;
- case TYPES:
- types=(Class[])in.readObject();
- break;
- case SIGNATURE:
- signature=(String[])in.readObject();
- break;
- case ID:
- break;
- default:
- if(log.isErrorEnabled()) log.error(ExternalStrings.MethodCall_MODE__0__IS_INVALID, mode);
- break;
- }
-
- boolean payload_available=in.readBoolean();
- if(payload_available) {
- payload=(Map)in.readObject();
- }
- }
-
-
-
-
-}
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MethodLookup.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MethodLookup.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MethodLookup.java
deleted file mode 100644
index 9ebcb30..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/MethodLookup.java
+++ /dev/null
@@ -1,15 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-import java.lang.reflect.Method;
-
-/**
- * @author Bela Ban
- * @version $Id: MethodLookup.java,v 1.3 2005/07/22 08:59:20 belaban Exp $
- */
-public interface MethodLookup {
- Method findMethod(short id);
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/NBMessageForm_NIO.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/NBMessageForm_NIO.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/NBMessageForm_NIO.java
deleted file mode 100644
index 48adfca..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/NBMessageForm_NIO.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: NBMessageForm_NIO.java,v 1.3 2005/06/30 15:38:43 belaban Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-
-/**
- * NBMessageForm - Message form for non-blocking message reads.
- * @author akbollu
- * @author Bela Ban
- */
-public class NBMessageForm_NIO
-{
- ByteBuffer headerBuffer = null;
- ByteBuffer dataBuffer = null;
- static final int HEADER_SIZE = 4;
- static/*GemStoneAddition*/ final boolean isComplete = false;
- int messageSize = 0;
-// boolean w_in_p = false; GemStoneAddition
- SocketChannel channel = null;
-
-
-
- public NBMessageForm_NIO(int dataBuffSize, SocketChannel ch)
- {
- headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
- dataBuffer = ByteBuffer.allocate(dataBuffSize);
- channel = ch;
- }
-
-
-
- public ByteBuffer readCompleteMsgBuffer() throws IOException
- {
-
- int rt;
-
- try {
- rt = channel.read(headerBuffer);
- if ( (rt == 0) || (rt == -1) )
- {
- channel.close();
- return null;
- }
- if (rt == HEADER_SIZE)
- {
- headerBuffer.flip();
- messageSize = headerBuffer.getInt();
- if(dataBuffer.capacity() < messageSize)
- {
- dataBuffer = ByteBuffer.allocate(messageSize);
- }
- }
- else {
- return null;
- }
- }
- catch(IOException ex) {
- channel.close();
- throw ex;
- }
-
-
- //rt == 0 need not be checked twice in the same event
- channel.read(dataBuffer);
- if(isComplete())
- {
- dataBuffer.flip();
- return dataBuffer;
- }
- return null;
- }
-
-
-
- public void reset()
- {
- dataBuffer.clear();
- headerBuffer.clear();
- messageSize = 0;
-// w_in_p = false; GemStoneAddition
- }
-
- private boolean isComplete()
- {
- return ( dataBuffer.position() == messageSize );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/NotificationBus.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/NotificationBus.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/NotificationBus.java
deleted file mode 100644
index a77c12e..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/NotificationBus.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: NotificationBus.java,v 1.9 2005/07/17 11:36:40 chrislott Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Promise;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.Serializable;
-import java.util.Vector;
-
-
-/**
- * This class provides notification sending and handling capability.
- * Producers can send notifications to all registered consumers.
- * Provides hooks to implement shared group state, which allows an
- * application programmer to maintain a local cache which is replicated
- * by all instances. NotificationBus sits on
- * top of a channel, however it creates its channel itself, so the
- * application programmers do not have to provide their own channel.
- *
- * @author Bela Ban
- */
-public class NotificationBus implements MessageListener, MembershipListener {
- final Vector members=new Vector();
- JChannel channel=null;
- Address local_addr=null;
- PullPushAdapter ad=null;
- Consumer consumer=null; // only a single consumer allowed
- String bus_name="notification_bus";
- final Promise get_cache_promise=new Promise();
- final Object cache_mutex=new Object();
-
- protected final GemFireTracer log=GemFireTracer.getLog(getClass());
-
-
- String props=null;
-
-
- public interface Consumer {
- void handleNotification(Serializable n);
-
- /** Called on the coordinator to obtains its cache */
- Serializable getCache();
-
- void memberJoined(Address mbr);
-
- void memberLeft(Address mbr);
- }
-
-
- public NotificationBus() throws Exception {
- this(null, null);
- }
-
-
- public NotificationBus(String bus_name) throws Exception {
- this(bus_name, null);
- }
-
-
- public NotificationBus(String bus_name, String properties) throws Exception {
- if(bus_name != null) this.bus_name=bus_name;
- if(properties != null) props=properties;
- channel=new JChannel(props);
- }
-
-
- public void setConsumer(Consumer c) {
- consumer=c;
- }
-
-
- public Address getLocalAddress() {
- if(local_addr != null) return local_addr;
- if(channel != null)
- local_addr=channel.getLocalAddress();
- return local_addr;
- }
-
-
- /**
- * Returns a reference to the real membership: don't modify.
- * If you need to modify, make a copy first !
- * @return Vector of Address objects
- */
- public Vector getMembership() {
- return members;
- }
-
-
- /**
- * Answers the Channel.
- * Used to operate on the underlying channel directly, e.g. perform operations that are not
- * provided using only NotificationBus. Should be used sparingly.
- * @return underlying Channel
- */
- public Channel getChannel() {
- return channel;
- }
-
-
- public boolean isCoordinator() {
- Object first_mbr=null;
-
- synchronized(members) {
- first_mbr=members.size() > 0 ? members.elementAt(0) : null;
- if(first_mbr == null)
- return true;
- }
- if(getLocalAddress() != null)
- return getLocalAddress().equals(first_mbr);
- return false;
- }
-
-
- public void start() throws Exception {
- channel.connect(bus_name);
- ad=new PullPushAdapter(channel, this, this);
- }
-
-
- public void stop() {
- if(ad != null) {
- ad.stop();
- ad=null;
- }
- if(channel != null) {
- channel.close(); // disconnects from channel and closes it
- channel=null;
- }
- }
-
-
- /** Pack the argument in a Info, serialize that one into the message buffer and send the message */
- public void sendNotification(Serializable n) {
- Message msg=null;
- byte[] data=null;
- Info info;
-
- try {
- if(n == null) return;
- info=new Info(Info.NOTIFICATION, n);
- data=Util.objectToByteBuffer(info);
- msg=new Message(null, null, data);
- if(channel == null) {
- if(log.isErrorEnabled()) log.error("channel is null. " +
- " Won't send notification");
- return;
- }
- channel.send(msg);
- }
- catch(Throwable ex) {
-
- if(log.isErrorEnabled()) log.error(ExternalStrings.NotificationBus_EXCEPTION_IS__0, ex);
- }
- }
-
-
- /**
- Determines the coordinator and asks it for its cache. If there is no coordinator (because we are first member),
- null will be returned. Used only internally by NotificationBus.
- @param timeout Max number of msecs until the call returns
- @param max_tries Max number of attempts to fetch the cache from the coordinator
- */
- public Serializable getCacheFromCoordinator(long timeout, int max_tries) {
- return getCacheFromMember(null, timeout, max_tries);
- }
-
-
- /**
- Determines the coordinator and asks it for its cache. If there is no coordinator (because we are first member),
- null will be returned. Used only internally by NotificationBus.
- @param mbr The address of the member from which to fetch the state. If null, the current coordinator
- will be asked for the state
- @param timeout Max number of msecs until the call returns - if timeout elapses
- null will be returned
- @param max_tries Max number of attempts to fetch the cache from the coordinator (will be set to 1 if < 1)
- */
- public Serializable getCacheFromMember(Address mbr, long timeout, int max_tries) {
- Serializable cache=null;
- int num_tries=0;
- Info info; // GemStoneAddition =new Info(Info.GET_CACHE_REQ);
- Message msg;
- Address dst=mbr; // member from which to fetch the cache
-
- long start, stop; // +++ remove
-
-
- if(max_tries < 1) max_tries=1;
-
- get_cache_promise.reset();
- while(num_tries <= max_tries) {
- if(mbr == null) { // mbr == null means get cache from coordinator
- dst=determineCoordinator();
- if(dst == null || dst.equals(getLocalAddress())) { // we are the first member --> empty cache
- if(log.isInfoEnabled()) log.info("[" + getLocalAddress() +
- "] no coordinator found --> first member (cache is empty)");
- return null;
- }
- }
-
- // +++ remove
- if(log.isInfoEnabled()) log.info("[" + getLocalAddress() + "] dst=" + dst +
- ", timeout=" + timeout + ", max_tries=" + max_tries + ", num_tries=" + num_tries);
-
- info=new Info(Info.GET_CACHE_REQ);
- msg=new Message(dst, null, info);
- channel.down(new Event(Event.MSG, msg));
-
- start=System.currentTimeMillis();
- cache=(Serializable) get_cache_promise.getResult(timeout);
- stop=System.currentTimeMillis();
- if(cache != null) {
- if(log.isInfoEnabled()) log.info("got cache from " +
- dst + ": cache is valid (waited " + (stop - start) + " msecs on get_cache_promise)");
- return cache;
- }
- else {
- if(log.isErrorEnabled()) log.error("received null cache; retrying (waited " +
- (stop - start) + " msecs on get_cache_promise)");
- }
-
- try { // GemStoneAddition
- Util.sleep(500);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error(ExternalStrings.NotificationBus_THREAD_INTERRUPTED);
- // treat as timeout
- break;
- }
- ++num_tries;
- }
-// if(cache == null) GemStoneAddition (can only be null)
- if(log.isErrorEnabled()) log.error("[" + getLocalAddress() +
- "] cache is null (num_tries=" + num_tries + ')');
- return cache;
- }
-
-
- /**
- Don't multicast this to all members, just apply it to local consumers.
- */
- public void notifyConsumer(Serializable n) {
- if(consumer != null && n != null)
- consumer.handleNotification(n);
- }
-
-
- /* -------------------------------- Interface MessageListener -------------------------------- */
- public void receive(Message msg) {
- Info info=null;
- Object obj;
-
- if(msg == null || msg.getLength() == 0) return;
- try {
- obj=msg.getObject();
- if(!(obj instanceof Info)) {
-
- if(log.isErrorEnabled()) log.error("expected an instance of Info (received " +
- obj.getClass().getName() + ')');
- return;
- }
- info=(Info) obj;
- switch(info.type) {
- case Info.NOTIFICATION:
- notifyConsumer(info.data);
- break;
-
- case Info.GET_CACHE_REQ:
- handleCacheRequest(msg.getSrc());
- break;
-
- case Info.GET_CACHE_RSP:
- // +++ remove
- if(log.isDebugEnabled()) log.debug("[GET_CACHE_RSP] cache was received from " + msg.getSrc());
- get_cache_promise.setResult(info.data);
- break;
-
- default:
- if(log.isErrorEnabled()) log.error(ExternalStrings.NotificationBus_TYPE__0__UNKNOWN, info.type);
- break;
- }
- }
- catch(Throwable ex) {
-
- if(log.isErrorEnabled()) log.error(ExternalStrings.NotificationBus_EXCEPTION_0, ex);
- }
- }
-
- public byte[] getState() {
- return null;
- }
-
- public void setState(byte[] state) {
- }
-
- /* ----------------------------- End of Interface MessageListener ---------------------------- */
-
-
-
-
- /* ------------------------------- Interface MembershipListener ------------------------------ */
-
- public synchronized void viewAccepted(View new_view) {
- Vector joined_mbrs, left_mbrs, tmp;
- Object tmp_mbr;
-
- if(new_view == null) return;
- tmp=new_view.getMembers();
-
- synchronized(members) {
- // get new members
- joined_mbrs=new Vector();
- for(int i=0; i < tmp.size(); i++) {
- tmp_mbr=tmp.elementAt(i);
- if(!members.contains(tmp_mbr))
- joined_mbrs.addElement(tmp_mbr);
- }
-
- // get members that left
- left_mbrs=new Vector();
- for(int i=0; i < members.size(); i++) {
- tmp_mbr=members.elementAt(i);
- if(!tmp.contains(tmp_mbr))
- left_mbrs.addElement(tmp_mbr);
- }
-
- // adjust our own membership
- members.removeAllElements();
- members.addAll(tmp);
- }
-
- if(consumer != null) {
- if(joined_mbrs.size() > 0)
- for(int i=0; i < joined_mbrs.size(); i++)
- consumer.memberJoined((Address) joined_mbrs.elementAt(i));
- if(left_mbrs.size() > 0)
- for(int i=0; i < left_mbrs.size(); i++)
- consumer.memberLeft((Address) left_mbrs.elementAt(i));
- }
- }
-
-
- public void suspect(SuspectMember suspected_mbr) {
- }
-
- public void block() {
- }
-
-
- public void channelClosing(Channel c, Exception e) {} // GemStoneAddition
-
-
- /* ----------------------------- End of Interface MembershipListener ------------------------- */
-
-
-
-
-
-
-
- /* ------------------------------------- Private Methods ------------------------------------- */
-
- Address determineCoordinator() {
- Vector v=channel != null ? channel.getView().getMembers() : null;
- return v != null ? (Address) v.elementAt(0) : null;
- }
-
-
- void handleCacheRequest(Address sender) {
- Serializable cache=null;
- Message msg;
- Info info;
-
- if(sender == null) {
- // +++ remove
- //
- if(log.isErrorEnabled()) log.error(ExternalStrings.NotificationBus_SENDER_IS_NULL);
- return;
- }
-
- synchronized(cache_mutex) {
- cache=getCache(); // get the cache from the consumer
- info=new Info(Info.GET_CACHE_RSP, cache);
- msg=new Message(sender, null, info);
- if(log.isInfoEnabled()) log.info(ExternalStrings.NotificationBus__0__RETURNING_CACHE_TO__1, new Object[] {getLocalAddress(), sender});
- channel.down(new Event(Event.MSG, msg));
- }
- }
-
- public Serializable getCache() {
- return consumer != null ? consumer.getCache() : null;
- }
-
-
-
- /* --------------------------------- End of Private Methods ---------------------------------- */
-
-
-
-
-
- private static class Info implements Serializable {
- private static final long serialVersionUID = -2247826108262348005L;
- public final static int NOTIFICATION=1;
- public final static int GET_CACHE_REQ=2;
- public final static int GET_CACHE_RSP=3;
-
-
- int type=0;
- Serializable data=null; // if type == NOTIFICATION data is notification, if type == GET_CACHE_RSP, data is cache
-
-
- public Info(int type) {
- this.type=type;
- }
-
- public Info(int type, Serializable data) {
- this.type=type;
- this.data=data;
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer sb=new StringBuffer();
- sb.append("type= ");
- if(type == NOTIFICATION)
- sb.append("NOTIFICATION");
- else if(type == GET_CACHE_REQ)
- sb.append("GET_CACHE_REQ");
- else if(type == GET_CACHE_RSP)
- sb.append("GET_CACHE_RSP");
- else
- sb.append("<unknown>");
- if(data != null) {
- if(type == NOTIFICATION)
- sb.append(", notification=" + data);
- else if(type == GET_CACHE_RSP) sb.append(", cache=" + data);
- }
- return sb.toString();
- }
- }
-
-
-}
-
-
-