You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:23:06 UTC
[42/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ReplicationManager.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ReplicationManager.java
deleted file mode 100644
index d8b16d3..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ReplicationManager.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: ReplicationManager.java,v 1.7 2004/09/23 16:29:11 belaban Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.util.RspList;
-
-import java.io.Serializable;
-
-
-
-
-
-/**
- * Class to propagate updates to a number of nodes in various ways:
- * <ol>
- * <li>Asynchronous
- * <li>Synchronous
- * <li>Synchronous with locking
- * </ol>
- *
- * <br/><em>Note: This class is experimental as of Oct 2002</em>
- *
- * @author Bela Ban Oct 2002
- */
-public class ReplicationManager implements RequestHandler {
- Address local_addr=null;
- ReplicationReceiver receiver=null;
-
- /** Used to broadcast updates and receive responses (latter only in synchronous case) */
- protected MessageDispatcher disp=null;
-
- protected final GemFireTracer log=GemFireTracer.getLog(this.getClass());
-
-
-
- /**
- * Creates an instance of ReplicationManager on top of a Channel
- */
- public ReplicationManager(Channel channel,
- MessageListener ml,
- MembershipListener l,
- ReplicationReceiver receiver) {
- setReplicationReceiver(receiver);
- if(channel != null) {
- local_addr=channel.getLocalAddress();
- disp=new MessageDispatcher(channel,
- ml,
- l,
- this, // ReplicationManager is RequestHandler
- true); // use deadlock detection
- }
- }
-
-
- /**
- * Creates an instance of ReplicationManager on top of a PullPushAdapter
- */
- public ReplicationManager(PullPushAdapter adapter,
- Serializable id,
- MessageListener ml,
- MembershipListener l,
- ReplicationReceiver receiver) {
- if(adapter != null && adapter.getTransport() != null && adapter.getTransport() instanceof Channel)
- local_addr=((Channel)adapter.getTransport()).getLocalAddress();
- setReplicationReceiver(receiver);
- disp=new MessageDispatcher(adapter,
- id, // FIXME
- ml,
- l,
- this); // ReplicationManager is RequestHandler
- disp.setDeadlockDetection(true);
- }
-
-
-
- public void stop() {
- if(disp != null)
- disp.stop();
- }
-
-
-
- /**
- * Create a new transaction. The transaction will be used to send updates, identify updates in the same transaction,
- * and eventually commit or rollback the changes associated with the transaction.
- * @return Xid A unique transaction
- * @exception Exception Thrown when local_addr is null
- */
- public Xid begin() throws Exception {
- return begin(Xid.DIRTY_READS);
- }
-
-
- /**
- * Create a new transaction. The tracsion will be used to send updates, identify updates in the same transaction,
- * and eventually commit or rollback the changes associated with the transaction.
- * @param transaction_mode Mode in which the transaction should run. Possible values are Xid.DIRTY_READS,
- * Xid.READ_COMMITTED, Xid.REPEATABLE_READ and Xid.SERIALIZABLE
- * @return Xid A unique transaction
- * @exception Exception Thrown when local_addr is null
- */
- public Xid begin(int transaction_mode) throws Exception {
- return Xid.create(local_addr, transaction_mode);
- }
-
-
- public void setReplicationReceiver(ReplicationReceiver handler) {
- this.receiver=handler;
- }
-
- public void setMembershipListener(MembershipListener l) {
- if(l == null)
- return;
- if(disp == null) { // GemStoneAddition: missing braces
- if(log.isErrorEnabled()) log.error(ExternalStrings.ReplicationManager_DISPATCHER_IS_NULL_CANNOT_SET_MEMBERSHIPLISTENER);
- }
- else
- disp.setMembershipListener(l);
- }
-
-
- /**
- * Sends a request to all members of the group. Sending is asynchronous (return immediately) or
- * synchronous (wait for all members to respond). If <code>use_locking</code> is true, then locking
- * will be used at the receivers to acquire locks before accessing/updating a resource. Locks can be
- * explicitly set using <code>lock_info</code> or implicitly through <code>data</code>. In the latter
- * case, locks are induced from the data sent, e.g. if the data is a request for updating a certain row
- * in a table, then we need to acquire a lock for that table.<p>
- * In case of using locks, if the transaction associated with update already has a lock for a given resource,
- * we will return. Otherwise, we will wait for <code>lock_acquisition_timeout</code> milliseconds. If the lock
- * is not granted within that time a <code>LockingException</code> will be thrown. (<em>We hope to
- * replace this timeout with a distributed deadlock detection algorithm in the future.</em>)<p>
- * We have 3 main use case for this method:
- * <ol>
- * <li><b>Asynchronous</b>: sends the message and returns immediately. Argument <code>asynchronous</code>
- * needs to be true. All other arguments except <code>data</code> are ignored and can be null. Will call
- * <code>update()</code> on the registered ReplicationReceiver at each receiver.
- * <li><b>Synchronous without locks</b>: sends the message, but returns only after responses from all members
- * have been received, or <code>synchronous_timeout</code> milliseconds have elapsed (whichever comes
- * first). Argument <code>asynchronous</code> needs to be false. Argument <code>synchronous_timeout</code>
- * needs to be >= 0. If it is null the call will not time out, but wait for all responses.
- * All other arguments (besides <code>data</code> are ignored).
- * <li><b>Synchronous with locks</b>: sends the message, but returns only after responses from all members
- * have been received, or <code>synchronous_timeout</code> milliseconds have elapsed (whichever comes
- * first). At the receiver's side we have to acquire a lock for the resource to be updated, if the
- * acquisition fails a LockingException will be thrown. The resource to be locked can be found in two ways:
- * either <code>data</code> contains the resource(c) to be acquired implicitly, or <code>lock_info</code>
- * lists the resources explicitly, or both. All the locks acquired at the receiver's side should be associated
- * with <code>transaction</code>. When a <code>commit()</code> is received, the receiver should commit
- * the modifications to the resource and release all locks. When a <code>rollback()</code> is received,
- * the receiver should remove all (temporary) modifications and release all locks associated with
- * <code>transaction</code>.
- * </ol>
- * In both the synchronous cases a List of byte[] will be returned if the data was sent to all receivers
- * successfully, cointaining byte buffers. The list may be empty.
- * @param dest The destination to which to send the message. Will be sent to all members if null.
- * @param data The data to be sent to all members. It may contain information about the resource to be locked.
- * @param synchronous If false the call is asynchronous, ie. non-blocking. If true, the method will wait
- * until responses from all members have been received (unless a timeout is defined, see below)
- * @param synchronous_timeout In a synchronous call, we will wait for responses from all members or until
- * <code>synchronous_timeout</code> have elapsed (whichever comes first). 0 means
- * to wait forever.
- * @param transaction The transaction under which all locks for resources should be acquired. The receiver
- * will probably maintain a lock table with resources as keys and transactions as values.
- * When an update is received, the receiver checks its lock table: if the resource is
- * not yet taken, the resource/transaction pair will be added to the lock table. Otherwise,
- * we check if the transaction's owner associated with the resource is the same as the caller.
- * If this is the case, the lock will be considered granted, otherwise we will wait for the
- * resource to become available (for a certain amount of time). When a transaction is
- * committed or rolled back, all resources associated with this transaction will be released.
- * @param lock_info Information about resource(s) to be acquired. This may be null, e.g. if this information
- * is already implied in <code>data</code>. Both <code>data</code> and <code>lock_info</code>
- * may be used to define the set of resources to be acquired.
- * @param lock_acquisition_timeout The number of milliseconds to wait until a lock acquisition request is
- * considered failed (causing a LockingException). If 0 we will wait forever.
- * (Note that this may lead to deadlocks).
- * @param lock_lease_timeout The number of milliseconds we want to keep the lock for a resource. After
- * this time has elapsed, the lock will be released. If 0 we won't release the lock(s)
- * @param use_locks If this is false, we will ignore all lock information (even if it is specified) and
- * not use locks at all.
- * @return RspList A list of Rsps ({@link com.gemstone.org.jgroups.util.Rsp}), one for each member. Each one is the result of
- * {@link ReplicationReceiver#receive}. If a member didn't send a response, the <code>received</code>
- * field will be false. If the member was suspected while waiting for a response, the <code>
- * suspected</code> field will be true. If the <code>receive()</code> method in the receiver returned
- * a value it will be in field <code>retval</code>. If the receiver threw an exception it will also
- * be in this field.
- */
- public RspList send(Address dest,
- byte[] data,
- boolean synchronous,
- long synchronous_timeout,
- Xid transaction,
- byte[] lock_info,
- long lock_acquisition_timeout,
- long lock_lease_timeout,
- boolean use_locks) { // throws UpdateException, TimeoutException, LockingException {
-
- Message msg=null;
- ReplicationData d=new ReplicationData(ReplicationData.SEND,
- data,
- transaction,
- lock_info,
- lock_acquisition_timeout,
- lock_lease_timeout,
- use_locks);
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.ReplicationManager_DATA_IS__0__SYNCHRONOUS_1, new Object[] {d, Boolean.valueOf(synchronous)});
- msg=new Message(dest, null, d);
- if(synchronous) {
- return disp.castMessage(null, msg, GroupRequest.GET_ALL, synchronous_timeout);
- }
- else {
- disp.castMessage(null, msg, GroupRequest.GET_NONE, 0);
- return null;
- }
- }
-
-
- /**
- * Commits all modifications sent to the receivers via {@link #send} and releases all locks associated with
- * this transaction. If modifications were made to stable storage (but not to resource), those modifications
- * would now need to be transferred to the resource (e.g. database).
- */
- public void commit(Xid transaction) {
- sendMessage(ReplicationData.COMMIT, transaction);
- }
-
-
- /**
- * Discards all modifications sent to the receivers via {@link #send} and releases all locks associated with
- * this transaction.
- */
- public void rollback(Xid transaction) {
- sendMessage(ReplicationData.ROLLBACK, transaction);
- }
-
-
- /* ------------------------------- RequestHandler interface ------------------------------ */
-
- public Object handle(Message msg) {
- Object retval=null;
- ReplicationData data;
-
- if(msg == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.ReplicationManager_RECEIVED_MESSAGE_WAS_NULL);
- return null;
- }
-
- if(msg.getLength() == 0) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.ReplicationManager_PAYLOAD_OF_RECEIVED_MESSAGE_WAS_NULL);
- return null;
- }
-
- try {
- data=(ReplicationData)msg.getObject();
- }
- catch(Throwable ex) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.ReplicationManager_FAILURE_UNMARSHALLING_MESSAGE__0, ex);
- return null;
- }
-
- switch(data.getType()) {
- case ReplicationData.SEND:
- try {
- return handleSend(data);
- }
- catch(Throwable ex) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.ReplicationManager_FAILED_HANDLING_UPDATE__0, ex);
- return ex;
- }
- case ReplicationData.COMMIT:
- handleCommit(data.getTransaction());
- break;
- case ReplicationData.ROLLBACK:
- handleRollback(data.getTransaction());
- break;
- default:
- if(log.isErrorEnabled()) log.error(ExternalStrings.ReplicationManager_RECEIVED_INCORRECT_REPLICATION_MESSAGE__0, data);
- return null;
- }
-
- return retval;
- }
-
- /* --------------------------- End of RequestHandler interface---------------------------- */
-
-
- protected Object handleSend(ReplicationData data) throws UpdateException, LockingException {
- try {
- if(receiver == null) {
- if(log.isWarnEnabled()) log.warn("receiver is not set");
- return null;
- }
- return receiver.receive(data.getTransaction(),
- data.getData(),
- data.getLockInfo(),
- data.getLockAcquisitionTimeout(),
- data.getLockLeaseTimeout(),
- data.useLocks());
- }
- catch(Throwable ex) {
- return ex;
- }
- }
-
-
- protected void handleCommit(Xid transaction) {
- if(receiver == null) {
- if(log.isWarnEnabled()) log.warn("receiver is not set");
- }
- else
- receiver.commit(transaction);
- }
-
- protected void handleRollback(Xid transaction) {
- if(receiver == null) {
- if(log.isWarnEnabled()) log.warn("receiver is not set");
- }
- else
- receiver.rollback(transaction);
- }
-
-
-
-
- /* -------------------------------------- Private methods ------------------------------------ */
-
-
- void sendMessage(int type, Xid transaction) {
- ReplicationData data=new ReplicationData(type, null, transaction, null, 0, 0, false);
- Message msg=new Message(null, null, data);
- disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); // send commit message asynchronously
- }
-
-
- /* ---------------------------------- End of Private methods --------------------------------- */
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ReplicationReceiver.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ReplicationReceiver.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ReplicationReceiver.java
deleted file mode 100644
index 78e5105..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ReplicationReceiver.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: ReplicationReceiver.java,v 1.2 2005/07/17 11:36:40 chrislott Exp $
-
-
-package com.gemstone.org.jgroups.blocks;
-
-
-/**
- * Implementation of this interface needs to register with ReplicationManager and will receive updates to be
- * applied to its locally replicated data. If locks are used the implementation is resposible for lock acquisition
- * and management. To do so, it probably needs to maintain a lock table (keys = resource objects, values = transactions)
- * to associate resources with locks and possibly a transaction table (keys = transactions, values = locks) to keep
- * track of all locks for a given transaction (to commit/release all modifications/locks for a given transaction).
- *
- * @author Bela Ban Nov 19 2002
- */
-public interface ReplicationReceiver {
-
- /**
- * Receives data sent by a sender to all group members and applies update to locally replicated data. This is
- * the result of a {@link com.gemstone.org.jgroups.blocks.ReplicationManager#send} call.
- *
- * @param transaction The transaction under which all locks will be acquired. Will be null if no locks are used (e.g.
- * <code>use_locks</code> is null).
- * @param data The data to be modified. In case of a database, this data would have to be stored in stable storage,
- * and would only be applied on a <code>commit()</code>. In case of a distributed replicated in-memory
- * data structure, the update might be applied directly and the subsequent commit() or rollback() might
- * be ignored. Note that this argument may contain the resource to be locked; in this case the <code>
- * lock_info</code> parameter might be null.
- * @param lock_info Information about the resource(s) to be locked. Will be null if no locks are used (e.g.
- * <code>use_locks</code> is null). Can also be null even if locks are used, e.g. when the resource(s)
- * to be locked are an implicit part of <code>data</code>.
- * @param lock_acquisition_timeout If locks are used, the number of milliseconds to wait for a lock to be acquired.
- * If this time elapses, a TimeoutException will be thrown. A value of 0 means
- * to wait forever. If <code>use_locks</code> is false, this value is ignored.
- * @param lock_lease_timeout The number of milliseconds to hold on to the lock, once it is acquired. A value of 0
- * means to never release the lock until commit() or rollback() are called.
- * @param use_locks Whether to use locking or not. If this value is false, all lock-related arguments will be
- * ignored, regardless of whether they are non-null.
- * @return Object A return value, the semantics of which are determined by caller of {@link com.gemstone.org.jgroups.blocks.ReplicationManager#send}
- * and the receiver. If no special value should be returned, null can be returned. Note that in the
- * latter case, null is still treated as a response (in the synchronous call).
- * @exception LockingException Thrown when a lock on a resource cannot be acquired
- * @exception UpdateException Thrown when the update fails (application semantics)
- */
- Object receive(Xid transaction,
- byte[] data,
- byte[] lock_info,
- long lock_acquisition_timeout,
- long lock_lease_timeout,
- boolean use_locks) throws LockingException, UpdateException;
-
-
- /**
- * Commit the modifications to the locally replicated data and release all locks. If the receive() call already
- * applied the changes, then this method is a nop.
- */
- void commit(Xid transaction);
-
-
- /**
- * Discard all modifications and release all locks. If the receive() call already applied the changes,
- * this method will not be able to rollback the modifications, but will only release the locks.
- */
- void rollback(Xid transaction);
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RequestCorrelator.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RequestCorrelator.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RequestCorrelator.java
deleted file mode 100644
index 709aed8..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RequestCorrelator.java
+++ /dev/null
@@ -1,911 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: RequestCorrelator.java,v 1.24 2005/11/12 06:39:11 belaban Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-
-import java.util.concurrent.ConcurrentHashMap;
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Scheduler;
-import com.gemstone.org.jgroups.util.SchedulerListener;
-import com.gemstone.org.jgroups.util.Streamable;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.*;
-import java.util.*;
-
-
-
-
-/**
- * Framework to send requests and receive matching responses (matching on
- * request ID).
- * Multiple requests can be sent at a time. Whenever a response is received,
- * the correct <code>RspCollector</code> is looked up (key = id) and its
- * method <code>receiveResponse()</code> invoked. A caller may use
- * <code>done()</code> to signal that no more responses are expected, and that
- * the corresponding entry may be removed.
- * <p>
- * <code>RequestCorrelator</code> can be installed at both client and server
- * sides, it can also switch roles dynamically; i.e., send a request and at
- * the same time process an incoming request (when local delivery is enabled,
- * this is actually the default).
- * <p>
- *
- * @author Bela Ban
- */
-public class RequestCorrelator {
-
- /** The protocol layer to use to pass up/down messages. Can be either a Protocol or a Transport */
- protected Object transport=null;
-
- /** The table of pending requests (keys=Long (request IDs), values=<tt>RequestEntry</tt>) */
- protected final Map requests=new ConcurrentHashMap();
-
- /** The handler for the incoming requests. It is called from inside the
- * dispatcher thread */
- protected RequestHandler request_handler=null;
-
- /** makes the instance unique (together with IDs) */
- protected String name=null;
-
- /** The dispatching thread pool */
- protected Scheduler scheduler=null;
-
-
- /** The address of this group member */
- protected Address local_addr=null;
-
- /**
- * This field is used only if deadlock detection is enabled.
- * In case of nested synchronous requests, it holds a list of the
- * addreses of the senders with the address at the bottom being the
- * address of the first caller
- */
- protected java.util.Stack call_stack=null;
-
- /** Whether or not to perform deadlock detection for synchronous (potentially recursive) group method invocations.
- * If on, we use a scheduler (handling a priority queue), otherwise we don't and call handleRequest() directly.
- */
- protected boolean deadlock_detection=false;
-
- /**
- * This field is used only if deadlock detection is enabled.
- * It sets the calling stack to the currently running request
- */
- protected CallStackSetter call_stack_setter=null;
-
- /** Process items on the queue concurrently (Scheduler). The default is to wait until the processing of an item
- * has completed before fetching the next item from the queue. Note that setting this to true
- * may destroy the properties of a protocol stack, e.g total or causal order may not be
- * guaranteed. Set this to true only if you know what you're doing ! */
- protected boolean concurrent_processing=false;
-
-
- protected boolean started=false;
-
- protected static final GemFireTracer log=GemFireTracer.getLog(RequestCorrelator.class);
-
-
- /**
- * Constructor. Uses transport to send messages. If <code>handler</code>
- * is not null, all incoming requests will be dispatched to it (via
- * <code>handle(Message)</code>).
- *
- * @param name Used to differentiate between different RequestCorrelators
- * (e.g. in different protocol layers). Has to be unique if multiple
- * request correlators are used.
- *
- * @param transport Used to send/pass up requests. Can be either a Transport (only send() will be
- * used then), or a Protocol (passUp()/passDown() will be used)
- *
- * @param handler Request handler. Method <code>handle(Message)</code>
- * will be called when a request is received.
- */
- public RequestCorrelator(String name, Object transport, RequestHandler handler) {
- this.name = name;
- this.transport = transport;
- request_handler = handler;
- start();
- }
-
-
- public RequestCorrelator(String name, Object transport, RequestHandler handler, Address local_addr) {
- this.name = name;
- this.transport = transport;
- this.local_addr=local_addr;
- request_handler = handler;
- start();
- }
-
-
- /**
- * Constructor. Uses transport to send messages. If <code>handler</code>
- * is not null, all incoming requests will be dispatched to it (via
- * <code>handle(Message)</code>).
- *
- * @param name Used to differentiate between different RequestCorrelators
- * (e.g. in different protocol layers). Has to be unique if multiple
- * request correlators are used.
- *
- * @param transport Used to send/pass up requests. Can be either a Transport (only send() will be
- * used then), or a Protocol (passUp()/passDown() will be used)
- *
- * @param handler Request handler. Method <code>handle(Message)</code>
- * will be called when a request is received.
- *
- * @param deadlock_detection When enabled (true) recursive synchronous
- * message calls will be detected and processed with higher priority in
- * order to solve deadlocks. Slows down processing a little bit when
- * enabled due to runtime checks involved.
- */
- public RequestCorrelator(String name, Object transport,
- RequestHandler handler, boolean deadlock_detection) {
- this.deadlock_detection = deadlock_detection;
- this.name = name;
- this.transport = transport;
- request_handler = handler;
- start();
- }
-
-
- public RequestCorrelator(String name, Object transport,
- RequestHandler handler, boolean deadlock_detection, boolean concurrent_processing) {
- this.deadlock_detection = deadlock_detection;
- this.name = name;
- this.transport = transport;
- request_handler = handler;
- this.concurrent_processing = concurrent_processing;
- start();
- }
-
- public RequestCorrelator(String name, Object transport,
- RequestHandler handler, boolean deadlock_detection, Address local_addr) {
- this.deadlock_detection = deadlock_detection;
- this.name = name;
- this.transport = transport;
- this.local_addr = local_addr;
- request_handler = handler;
- start();
- }
-
- public RequestCorrelator(String name, Object transport, RequestHandler handler,
- boolean deadlock_detection, Address local_addr, boolean concurrent_processing) {
- this.deadlock_detection = deadlock_detection;
- this.name = name;
- this.transport = transport;
- this.local_addr = local_addr;
- request_handler = handler;
- this.concurrent_processing = concurrent_processing;
- start();
- }
-
-
-
-
- /**
- * Switch the deadlock detection mechanism on/off
- * @param flag the deadlock detection flag
- */
- public void setDeadlockDetection(boolean flag) {
- if(deadlock_detection != flag) { // only set it if different
- deadlock_detection=flag;
- if(started) {
- if(deadlock_detection) {
- startScheduler();
- }
- else {
- stopScheduler();
- }
- }
- }
- }
-
-
- public void setRequestHandler(RequestHandler handler) {
- request_handler=handler;
- start();
- }
-
-
- public void setConcurrentProcessing(boolean concurrent_processing) {
- this.concurrent_processing=concurrent_processing;
- }
-
-
- /**
- * Helper method for {@link #sendRequest(long,List,Message,RspCollector)}.
- */
- public void sendRequest(long id, Message msg, RspCollector coll) {
- sendRequest(id, null, msg, coll);
- }
-
-
- /**
- * Send a request to a group. If no response collector is given, no
- * responses are expected (making the call asynchronous).
- *
- * @param id The request ID. Must be unique for this JVM (e.g. current
- * time in millisecs)
- * @param dest_mbrs The list of members who should receive the call. Usually a group RPC
- * is sent via multicast, but a receiver drops the request if its own address
- * is not in this list. Will not be used if it is null.
- * @param msg The request to be sent. The body of the message carries
- * the request data
- *
- * @param coll A response collector (usually the object that invokes
- * this method). Its methods <code>receiveResponse()</code> and
- * <code>suspect()</code> will be invoked when a message has been received
- * or a member is suspected, respectively.
- */
- public void sendRequest(long id, List dest_mbrs, Message msg, RspCollector coll) {
- Header hdr;
-
- if(transport == null) {
- if(log.isWarnEnabled()) log.warn("transport is not available !");
- return;
- }
-
- // i. Create the request correlator header and add it to the
- // msg
- // ii. If a reply is expected (sync call / 'coll != null'), add a
- // coresponding entry in the pending requests table
- // iii. If deadlock detection is enabled, set/update the call stack
- // iv. Pass the msg down to the protocol layer below
- hdr = new Header(Header.REQ, id, (coll!=null? true:false), name);
- hdr.dest_mbrs=dest_mbrs;
-
- if (coll != null) {
- if(deadlock_detection) {
- if(local_addr == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.RequestCorrelator_LOCAL_ADDRESS_IS_NULL_);
- return;
- }
- java.util.Stack new_call_stack = (call_stack != null?
- (java.util.Stack)call_stack.clone():new java.util.Stack());
- new_call_stack.push(local_addr);
- hdr.callStack=new_call_stack;
- }
- addEntry(hdr.id, new RequestEntry(coll));
- }
- msg.putHeader(name, hdr);
-
- try {
- if(transport instanceof Protocol)
- ((Protocol)transport).passDown(new Event(Event.MSG, msg));
- else if(transport instanceof Transport)
- ((Transport)transport).send(msg);
- else
- if(log.isErrorEnabled())
- log.error("transport object has to be either a " +
- "Transport or a Protocol, however it is a " + transport.getClass());
- }
- catch(Throwable e) {
- if(log.isWarnEnabled()) log.warn(e.toString());
- }
- }
-
-
-
-
-
- /**
- * Used to signal that a certain request may be garbage collected as
- * all responses have been received.
- */
- public void done(long id) {
- removeEntry(id);
- }
-
-
- /**
- * <b>Callback</b>.
- * <p>
- * Called by the protocol below when a message has been received. The
- * algorithm should test whether the message is destined for us and,
- * if not, pass it up to the next layer. Otherwise, it should remove
- * the header and check whether the message is a request or response.
- * In the first case, the message will be delivered to the request
- * handler registered (calling its <code>handle()</code> method), in the
- * second case, the corresponding response collector is looked up and
- * the message delivered.
- */
- public void receive(Event evt) {
- switch(evt.getType()) {
- case Event.SUSPECT: // don't wait for responses from faulty members
- receiveSuspect(((SuspectMember)evt.getArg()).suspectedMember); // GemStoneAddition SuspectMember struct
- break;
- case Event.VIEW_CHANGE: // adjust number of responses to wait for
- receiveView((View)evt.getArg());
- break;
-
- case Event.SET_LOCAL_ADDRESS:
- setLocalAddress((Address)evt.getArg());
- break;
- case Event.MSG:
- if(!receiveMessage((Message)evt.getArg()))
- return;
- break;
- }
- if(transport instanceof Protocol)
- ((Protocol)transport).passUp(evt);
- else
- if(log.isErrorEnabled()) log.error(ExternalStrings.RequestCorrelator_WE_DO_NOT_PASS_UP_MESSAGES_VIA_TRANSPORT);
- }
-
-
- /**
- */
- public void start() {
- if(deadlock_detection) {
- startScheduler();
- }
- started=true;
- }
-
- public void stop() {
- stopScheduler();
- started=false;
- }
-
-
- void startScheduler() {
- if(scheduler == null) {
- scheduler=new Scheduler();
- if(deadlock_detection && call_stack_setter == null) {
- call_stack_setter=new CallStackSetter();
- scheduler.setListener(call_stack_setter);
- }
- if(concurrent_processing)
- scheduler.setConcurrentProcessing(concurrent_processing);
- scheduler.start();
- }
- }
-
-
- void stopScheduler() {
- if(scheduler != null) {
- scheduler.stop();
- scheduler=null;
- }
- }
-
-
- // .......................................................................
-
-
-
- /**
- * <tt>Event.SUSPECT</tt> event received from a layer below.
- * <p>
- * All response collectors currently registered will
- * be notified that <code>mbr</code> may have crashed, so they won't
- * wait for its response.
- */
- public void receiveSuspect(Address mbr) {
- RequestEntry entry;
- ArrayList copy;
-
- if(mbr == null) return;
- if(log.isDebugEnabled()) log.debug("suspect=" + mbr);
-
- // copy so we don't run into bug #761804 - Bela June 27 2003
- copy=new ArrayList(requests.values());
- for(Iterator it=copy.iterator(); it.hasNext();) {
- entry=(RequestEntry)it.next();
- if(entry.coll != null)
- entry.coll.suspect(mbr);
- }
- }
-
-
- /**
- * <tt>Event.VIEW_CHANGE</tt> event received from a layer below.
- * <p>
- * Mark all responses from members that are not in new_view as
- * NOT_RECEIVED.
- *
- */
- public void receiveView(View new_view) {
- RequestEntry entry;
- ArrayList copy;
-
- // copy so we don't run into bug #761804 - Bela June 27 2003
- copy=new ArrayList(requests.values());
- for(Iterator it=copy.iterator(); it.hasNext();) {
- entry=(RequestEntry)it.next();
- if(entry.coll != null)
- entry.coll.viewChange(new_view);
- }
- }
-
-
- /**
- * Handles a message coming from a layer below
- *
- * @return true if the event should be forwarded further up, otherwise false (message was consumed)
- */
- public boolean receiveMessage(Message msg) {
- Object tmpHdr;
-
- // i. If header is not an instance of request correlator header, ignore
- //
- // ii. Check whether the message was sent by a request correlator with
- // the same name (there may be multiple request correlators in the same
- // protocol stack...)
- tmpHdr=msg.getHeader(name);
- if(tmpHdr == null || !(tmpHdr instanceof Header)) {
- return true;
- }
-
- Header hdr=(Header)tmpHdr;
- if(hdr.corrName == null || !hdr.corrName.equals(name)) {
- if(log.isTraceEnabled()) {
- log.trace(new StringBuffer("name of request correlator header (").append(hdr.corrName).
- append(") is different from ours (").append(name).append("). Msg not accepted, passed up"));
- }
- return true;
- }
-
- // If the header contains a destination list, and we are not part of it, then we discard the
- // request (was addressed to other members)
- java.util.List dests=hdr.dest_mbrs;
- if(dests != null && local_addr != null && !dests.contains(local_addr)) {
- if(log.isTraceEnabled()) {
- log.trace(new StringBuffer("discarded request from ").append(msg.getSrc()).
- append(" as we are not part of destination list (local_addr=").
- append(local_addr).append(", hdr=").append(hdr).append(')'));
- }
- return false;
- }
-
-
- // [Header.REQ]:
- // i. If there is no request handler, discard
- // ii. Check whether priority: if synchronous and call stack contains
- // address that equals local address -> add priority request. Else
- // add normal request.
- //
- // [Header.RSP]:
- // Remove the msg request correlator header and notify the associated
- // <tt>RspCollector</tt> that a reply has been received
- switch(hdr.type) {
- case Header.REQ:
- if(request_handler == null) {
- if(log.isWarnEnabled()) {
- log.warn("there is no request handler installed to deliver request !");
- }
- return false;
- }
-
- if(deadlock_detection) {
- if(scheduler == null) {
- log.error("deadlock_detection is true, but scheduler is null: this is not supposed to happen" +
- " (discarding request)");
- break;
- }
-
- Request req=new Request(msg);
- java.util.Stack stack=hdr.callStack;
- if(hdr.rsp_expected && stack != null && local_addr != null) {
- if(stack.contains(local_addr)) {
- if(log.isTraceEnabled())
- log.trace("call stack=" + hdr.callStack + " contains " + local_addr +
- ": adding request to priority queue");
- scheduler.addPrio(req);
- break;
- }
- }
- scheduler.add(req);
- break;
- }
-
- handleRequest(msg);
- break;
-
- case Header.RSP:
- msg.removeHeader(name);
- RspCollector coll=findEntry(hdr.id);
- if(coll != null) {
- coll.receiveResponse(msg);
- }
- break;
-
- default:
- msg.removeHeader(name);
- if(log.isErrorEnabled()) log.error(ExternalStrings.RequestCorrelator_HEADERS_TYPE_IS_NEITHER_REQ_NOR_RSP_);
- break;
- }
-
- return (false);
- }
-
- public Address getLocalAddress() {
- return local_addr;
- }
-
- public void setLocalAddress(Address local_addr) {
- this.local_addr=local_addr;
- }
-
-
- // .......................................................................
-
- /**
- * Add an association of:<br>
- * ID -> <tt>RspCollector</tt>
- */
- private void addEntry(long id, RequestEntry entry) {
- Long id_obj = Long.valueOf(id);
- synchronized(requests) {
- if(!requests.containsKey(id_obj))
- requests.put(id_obj, entry);
- else
- if(log.isWarnEnabled()) log.warn("entry " + entry + " for request-id=" + id + " already present !");
- }
- }
-
-
- /**
- * Remove the request entry associated with the given ID
- *
- * @param id the id of the <tt>RequestEntry</tt> to remove
- */
- private void removeEntry(long id) {
- Long id_obj = Long.valueOf(id);
-
- // changed by bela Feb 28 2003 (bug fix for 690606)
- // changed back to use synchronization by bela June 27 2003 (bug fix for #761804),
- // we can do this because we now copy for iteration (viewChange() and suspect())
- requests.remove(id_obj);
- }
-
-
- /**
- * @param id the ID of the corresponding <tt>RspCollector</tt>
- *
- * @return the <tt>RspCollector</tt> associated with the given ID
- */
- private RspCollector findEntry(long id) {
- Long id_obj = Long.valueOf(id);
- RequestEntry entry;
-
- entry=(RequestEntry)requests.get(id_obj);
- return((entry != null)? entry.coll:null);
- }
-
-
- /**
- * Handle a request msg for this correlator
- *
- * @param req the request msg
- */
- protected/*GemStoneAddition*/ void handleRequest(Message req) {
- Object retval;
- byte[] rsp_buf=null;
- Header hdr, rsp_hdr;
- Message rsp;
-
- // i. Remove the request correlator header from the msg and pass it to
- // the registered handler
- //
- // ii. If a reply is expected, pack the return value from the request
- // handler to a reply msg and send it back. The reply msg has the same
- // ID as the request and the name of the sender request correlator
- hdr = (Header)req.removeHeader(name);
-
- if(log.isTraceEnabled()) {
- log.trace(new StringBuffer("calling (").append((request_handler != null? request_handler.getClass().getName() : "null")).
- append(") with request ").append(hdr.id));
- }
-
- try {
- retval = request_handler.handle(req);
- }
- catch(Throwable t) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.RequestCorrelator_ERROR_INVOKING_METHOD, t);
- retval=t;
- }
-
- if(!hdr.rsp_expected) // asynchronous call, we don't need to send a response; terminate call here
- return;
-
- if(transport == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.RequestCorrelator_FAILURE_SENDING_RESPONSE_NO_TRANSPORT_AVAILABLE);
- return;
- }
-
- // changed (bela Feb 20 2004): catch exception and return exception
- try {
- rsp_buf=Util.objectToByteBuffer(retval); // retval could be an exception, or a real value
- }
- catch(Throwable t) {
- try {
- rsp_buf=Util.objectToByteBuffer(t); // this call should succeed (all exceptions are serializable)
- }
- catch(Throwable tt) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.RequestCorrelator_FAILED_SENDING_RSP_RETURN_VALUE__0__IS_NOT_SERIALIZABLE, retval);
- return;
- }
- }
-
- rsp=req.makeReply();
- if(rsp_buf != null)
- rsp.setBuffer(rsp_buf);
- rsp_hdr=new Header(Header.RSP, hdr.id, false, name);
- rsp.putHeader(name, rsp_hdr);
- if(log.isTraceEnabled())
- log.trace(new StringBuffer("sending rsp for ").append(rsp_hdr.id).append(" to ").append(rsp.getDest()));
-
- try {
- if(transport instanceof Protocol)
- ((Protocol)transport).passDown(new Event(Event.MSG, rsp));
- else if(transport instanceof Transport)
- ((Transport)transport).send(rsp);
- else
- if(log.isErrorEnabled()) log.error("transport object has to be either a " +
- "Transport or a Protocol, however it is a " + transport.getClass());
- }
- catch(Throwable e) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.RequestCorrelator_FAILED_SENDING_THE_RESPONSE, e);
- }
- }
-
-
- // .......................................................................
-
-
-
-
-
- /**
- * Associates an ID with an <tt>RspCollector</tt>
- */
- private static class RequestEntry {
- public RspCollector coll = null;
-
- public RequestEntry(RspCollector coll) {
- this.coll = coll;
- }
- }
-
-
-
- /**
- * The header for <tt>RequestCorrelator</tt> messages
- */
- public static final class Header extends com.gemstone.org.jgroups.Header implements Streamable {
- public static final byte REQ = 0;
- public static final byte RSP = 1;
-
- /** Type of header: request or reply */
- public byte type=REQ;
- /**
- * The id of this request to distinguish among other requests from
- * the same <tt>RequestCorrelator</tt> */
- public long id=0;
-
- /** msg is synchronous if true */
- public boolean rsp_expected=true;
-
- /** The unique name of the associated <tt>RequestCorrelator</tt> */
- public String corrName=null;
-
- /** Stack of Address. Contains senders (e.g. P --> Q --> R) */
- public java.util.Stack callStack=null;
-
- /** Contains a list of members who should receive the request (others will drop). Ignored if null */
- public java.util.List dest_mbrs=null;
-
-
- /**
- * Used for externalization
- */
- public Header() {}
-
- /**
- * @param type type of header (<tt>REQ</tt>/<tt>RSP</tt>)
- * @param id id of this header relative to ids of other requests
- * originating from the same correlator
- * @param rsp_expected whether it's a sync or async request
- * @param name the name of the <tt>RequestCorrelator</tt> from which
- */
- public Header(byte type, long id, boolean rsp_expected, String name) {
- this.type = type;
- this.id = id;
- this.rsp_expected = rsp_expected;
- this.corrName = name;
- }
-
- /**
- */
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer ret=new StringBuffer();
- ret.append("[Header: name=" + corrName + ", type=");
- ret.append(type == REQ ? "REQ" : type == RSP ? "RSP" : "<unknown>");
- ret.append(", id=" + id);
- ret.append(", rsp_expected=" + rsp_expected + ']');
- if(callStack != null)
- ret.append(", call stack=" + callStack);
- if(dest_mbrs != null)
- ret.append(", dest_mbrs=").append(dest_mbrs);
- return ret.toString();
- }
-
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeByte(type);
- out.writeLong(id);
- out.writeBoolean(rsp_expected);
- if(corrName != null) {
- out.writeBoolean(true);
- out.writeUTF(corrName);
- }
- else {
- out.writeBoolean(false);
- }
- out.writeObject(callStack);
- out.writeObject(dest_mbrs);
- }
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- type = in.readByte();
- id = in.readLong();
- rsp_expected = in.readBoolean();
- if(in.readBoolean())
- corrName = in.readUTF();
- callStack = (java.util.Stack)in.readObject();
- dest_mbrs=(java.util.List)in.readObject();
- }
-
- public void writeTo(DataOutputStream out) throws IOException {
- out.writeByte(type);
- out.writeLong(id);
- out.writeBoolean(rsp_expected);
-
- if(corrName != null) {
- out.writeBoolean(true);
- out.writeUTF(corrName);
- }
- else {
- out.writeBoolean(false);
- }
-
- if(callStack != null) {
- out.writeBoolean(true);
- out.writeShort(callStack.size());
- Address mbr;
- for(int i=0; i < callStack.size(); i++) {
- mbr=(Address)callStack.elementAt(i);
- Util.writeAddress(mbr, out);
- }
- }
- else {
- out.writeBoolean(false);
- }
-
- Util.writeAddresses(dest_mbrs, out);
- }
-
- public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
- boolean present;
- type=in.readByte();
- id=in.readLong();
- rsp_expected=in.readBoolean();
-
- present=in.readBoolean();
- if(present)
- corrName=in.readUTF();
-
- present=in.readBoolean();
- if(present) {
- callStack=new Stack();
- short len=in.readShort();
- Address tmp;
- for(short i=0; i < len; i++) {
- tmp=Util.readAddress(in);
- callStack.add(tmp);
- }
- }
-
- dest_mbrs=(List)Util.readAddresses(in, java.util.LinkedList.class);
- }
-
- @Override // GemStoneAddition
- public long size(short version) {
- long retval=Global.BYTE_SIZE // type
- + Global.LONG_SIZE // id
- + Global.BYTE_SIZE; // rsp_expected
-
- retval+=Global.BYTE_SIZE; // presence for corrName
- if(corrName != null)
- retval+=corrName.length() +2; // UTF
-
- retval+=Global.BYTE_SIZE; // presence
- if(callStack != null) {
- retval+=Global.SHORT_SIZE; // number of elements
- if(callStack.size() > 0) {
- Address mbr=(Address)callStack.firstElement();
- retval+=callStack.size() * Util.size(mbr,version);
- }
- }
-
- retval+=Util.size(dest_mbrs, version);
- return retval;
- }
-
- }
-
-
-
-
- /**
- * Listens for scheduler events and sets the current call chain (stack)
- * whenever a thread is started, or a suspended thread resumed. Does
- * this only for synchronous requests (<code>Runnable</code> is actually
- * a <code>Request</code>).
- */
- protected/*GemStoneAddition*/ class CallStackSetter implements SchedulerListener {
- public void started(Runnable r) { setCallStack(r); }
- public void stopped(Runnable r) { setCallStack(null); }
- public void suspended(Runnable r) { setCallStack(null); }
- public void resumed(Runnable r) { setCallStack(r); }
-
- void setCallStack(Runnable r) {
- java.util.Stack new_stack;
- Message req;
- Header hdr;
- Object obj;
-
- if(r == null) {
- call_stack=null;
- return;
- }
-
- req=((Request)r).req;
- if(req == null)
- return;
-
- obj=req.getHeader(name);
- if(obj == null || !(obj instanceof Header))
- return;
-
- hdr=(Header)obj;
- if(hdr.rsp_expected == false)
- return;
-
- new_stack=hdr.callStack;
- if(new_stack != null)
- call_stack=(java.util.Stack)new_stack.clone();
- }
- }
-
-
- /**
- * The runnable for an incoming request which is submitted to the
- * dispatcher
- */
- private class Request implements Runnable {
- public final Message req;
-
- public Request(Message req) { this.req=req; }
- public void run() { handleRequest(req); }
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer sb=new StringBuffer();
- if(req != null)
- sb.append("req=" + req + ", headers=" + req.printObjectHeaders());
- return sb.toString();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RequestHandler.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RequestHandler.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RequestHandler.java
deleted file mode 100644
index 47d97d9..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RequestHandler.java
+++ /dev/null
@@ -1,15 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: RequestHandler.java,v 1.1.1.1 2003/09/09 01:24:08 belaban Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-
-import com.gemstone.org.jgroups.Message;
-
-
-public interface RequestHandler {
- Object handle(Message msg);
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RpcDispatcher.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RpcDispatcher.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RpcDispatcher.java
deleted file mode 100644
index 875e922..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RpcDispatcher.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: RpcDispatcher.java,v 1.20 2005/11/12 06:39:21 belaban Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Channel;
-import com.gemstone.org.jgroups.ChannelListener;
-import com.gemstone.org.jgroups.MembershipListener;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.MessageListener;
-import com.gemstone.org.jgroups.SuspectedException;
-import com.gemstone.org.jgroups.TimeoutException;
-import com.gemstone.org.jgroups.Transport;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.RspList;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.Serializable;
-import java.util.Vector;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.lang.reflect.Method;
-
-
-
-
-/**
- * This class allows a programmer to invoke remote methods in all (or single)
- * group members and optionally wait for the return value(s).
- * An application will typically create a channel and layer the
- * RpcDispatcher building block on top of it, which allows it to
- * dispatch remote methods (client role) and at the same time be
- * called by other members (server role).
- * This class is derived from MessageDispatcher.
-* Is the equivalent of RpcProtocol on the application rather than protocol level.
- * @author Bela Ban
- */
-public class RpcDispatcher extends MessageDispatcher implements ChannelListener {
- protected Object server_obj=null;
- protected Marshaller marshaller=null;
- protected List additionalChannelListeners=null;
- protected MethodLookup method_lookup=null;
-
-
- public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object server_obj) {
- super(channel, l, l2);
- channel.addChannelListener(this);
- this.server_obj=server_obj;
- additionalChannelListeners = new ArrayList();
- }
-
-
- public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object server_obj,
- boolean deadlock_detection) {
- super(channel, l, l2, deadlock_detection);
- channel.addChannelListener(this);
- this.server_obj=server_obj;
- additionalChannelListeners = new ArrayList();
- }
-
- public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object server_obj,
- boolean deadlock_detection, boolean concurrent_processing) {
- super(channel, l, l2, deadlock_detection, concurrent_processing);
- channel.addChannelListener(this);
- this.server_obj=server_obj;
- additionalChannelListeners = new ArrayList();
- }
-
-
-
- public RpcDispatcher(PullPushAdapter adapter, Serializable id,
- MessageListener l, MembershipListener l2, Object server_obj) {
- super(adapter, id, l, l2);
-
- // Fixes bug #804956
- // channel.setChannelListener(this);
- if(this.adapter != null) {
- Transport t=this.adapter.getTransport();
- if(t != null && t instanceof Channel) {
- ((Channel)t).addChannelListener(this);
- }
- }
-
- this.server_obj=server_obj;
- additionalChannelListeners = new ArrayList();
- }
-
-
- public interface Marshaller {
- byte[] objectToByteBuffer(Object obj) throws Exception;
- Object objectFromByteBuffer(byte[] buf) throws Exception;
- }
-
-
- public String getName() {return "RpcDispatcher";}
-
- public void setMarshaller(Marshaller m) {this.marshaller=m;}
-
- public Marshaller getMarshaller() {return marshaller;}
-
- public Object getServerObject() {return server_obj;}
-
- public MethodLookup getMethodLookup() {
- return method_lookup;
- }
-
- public void setMethodLookup(MethodLookup method_lookup) {
- this.method_lookup=method_lookup;
- }
-
-
- @Override // GemStoneAddition
- public RspList castMessage(Vector dests, Message msg, int mode, long timeout) {
- if(log.isErrorEnabled()) log.error("this method should not be used with " +
- "RpcDispatcher, but MessageDispatcher. Returning null");
- return null;
- }
-
- @Override // GemStoneAddition
- public Object sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException {
- if(log.isErrorEnabled()) log.error("this method should not be used with " +
- "RpcDispatcher, but MessageDispatcher. Returning null");
- return null;
- }
-
-
-
-
-
- public RspList callRemoteMethods(Vector dests, String method_name, Object[] args,
- Class[] types, int mode, long timeout) {
- MethodCall method_call=new MethodCall(method_name, args, types);
- return callRemoteMethods(dests, method_call, mode, timeout);
- }
-
- public RspList callRemoteMethods(Vector dests, String method_name, Object[] args,
- String[] signature, int mode, long timeout) {
- MethodCall method_call=new MethodCall(method_name, args, signature);
- return callRemoteMethods(dests, method_call, mode, timeout);
- }
-
-
- public RspList callRemoteMethods(Vector dests, MethodCall method_call, int mode, long timeout) {
- if(dests != null && dests.size() == 0) {
- // don't send if dest list is empty
- if(log.isTraceEnabled())
- log.trace(new StringBuffer("destination list of ").append(method_call.getName()).
- append("() is empty: no need to send message"));
- return new RspList();
- }
-
- if(log.isTraceEnabled())
- log.trace(new StringBuffer("dests=").append(dests).append(", method_call=").append(method_call).
- append(", mode=").append(mode).append(", timeout=").append(timeout));
-
- byte[] buf;
- try {
- buf=marshaller != null? marshaller.objectToByteBuffer(method_call) : Util.objectToByteBuffer(method_call);
- }
- catch(Exception e) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.RpcDispatcher_EXCEPTION_0, e);
- return null;
- }
-
- Message msg=new Message(null, null, buf);
- RspList retval=super.castMessage(dests, msg, mode, timeout);
- if(log.isTraceEnabled()) log.trace("responses: " + retval);
- return retval;
- }
-
-
-
- public Object callRemoteMethod(Address dest, String method_name, Object[] args,
- Class[] types, int mode, long timeout)
- throws TimeoutException, SuspectedException {
- MethodCall method_call=new MethodCall(method_name, args, types);
- return callRemoteMethod(dest, method_call, mode, timeout);
- }
-
- public Object callRemoteMethod(Address dest, String method_name, Object[] args,
- String[] signature, int mode, long timeout)
- throws TimeoutException, SuspectedException {
- MethodCall method_call=new MethodCall(method_name, args, signature);
- return callRemoteMethod(dest, method_call, mode, timeout);
- }
-
- public Object callRemoteMethod(Address dest, MethodCall method_call, int mode, long timeout)
- throws TimeoutException, SuspectedException {
- byte[] buf=null;
- Message msg=null;
- Object retval=null;
-
- if(log.isTraceEnabled())
- log.trace("dest=" + dest + ", method_call=" + method_call + ", mode=" + mode + ", timeout=" + timeout);
-
- try {
- buf=marshaller != null? marshaller.objectToByteBuffer(method_call) : Util.objectToByteBuffer(method_call);
- }
- catch(Exception e) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.RpcDispatcher_EXCEPTION_0, e);
- return null;
- }
-
- msg=new Message(dest, null, buf);
- retval=super.sendMessage(msg, mode, timeout);
- if(log.isTraceEnabled()) log.trace("retval: " + retval);
- return retval;
- }
-
-
-
-
-
- /**
- * Message contains MethodCall. Execute it against *this* object and return result.
- * Use MethodCall.invoke() to do this. Return result.
- */
- @Override // GemStoneAddition
- public Object handle(Message req) {
- Object body=null;
- MethodCall method_call;
-
- if(server_obj == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.RpcDispatcher_NO_METHOD_HANDLER_IS_REGISTERED_DISCARDING_REQUEST);
- return null;
- }
-
- if(req == null || req.getLength() == 0) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.RpcDispatcher_MESSAGE_OR_MESSAGE_BUFFER_IS_NULL);
- return null;
- }
-
- try {
- body=marshaller != null? marshaller.objectFromByteBuffer(req.getBuffer()) : req.getObject();
- }
- catch(Throwable e) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.RpcDispatcher_EXCEPTION_0, e);
- return e;
- }
-
- if(body == null || !(body instanceof MethodCall)) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.RpcDispatcher_MESSAGE_DOES_NOT_CONTAIN_A_METHODCALL_OBJECT);
- return null;
- }
-
- method_call=(MethodCall)body;
-
- try {
- if(log.isTraceEnabled())
- log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call);
-
- if(method_call.getMode() == MethodCall.ID) {
- if(method_lookup == null)
- throw new Exception("MethodCall uses ID=" + method_call.getId() + ", but method_lookup has not been set");
- Method m=method_lookup.findMethod(method_call.getId());
- if(m == null)
- throw new Exception("no method foudn for " + method_call.getId());
- method_call.setMethod(m);
- }
-
- return method_call.invoke(server_obj);
- }
- catch(Throwable x) {
- log.error(ExternalStrings.RpcDispatcher_FAILED_INVOKING_METHOD, x);
- return x;
- }
- }
-
- /**
- * Add a new channel listener to be notified on the channel's state change.
- *
- * @return true if the listener was added or false if the listener was already in the list.
- */
- public boolean addChannelListener(ChannelListener l) {
-
- synchronized(additionalChannelListeners) {
- if (additionalChannelListeners.contains(l)) {
- return false;
- }
- additionalChannelListeners.add(l);
- return true;
- }
- }
-
-
- /**
- *
- * @return true if the channel was removed indeed.
- */
- public boolean removeChannelListener(ChannelListener l) {
-
- synchronized(additionalChannelListeners) {
- return additionalChannelListeners.remove(l);
- }
- }
-
-
-
- /* --------------------- Interface ChannelListener ---------------------- */
-
- public void channelConnected(Channel channel) {
-
- synchronized(additionalChannelListeners) {
- for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) {
- ChannelListener l = (ChannelListener)i.next();
- try {
- l.channelConnected(channel);
- }
- catch(Throwable t) {
- log.warn("channel listener failed", t);
- }
- }
- }
- }
-
- public void channelDisconnected(Channel channel) {
-
- stop();
-
- synchronized(additionalChannelListeners) {
- for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) {
- ChannelListener l = (ChannelListener)i.next();
- try {
- l.channelDisconnected(channel);
- }
- catch(Throwable t) {
- log.warn("channel listener failed", t);
- }
- }
- }
- }
-
- public void channelClosed(Channel channel) {
-
- stop();
-
- synchronized(additionalChannelListeners) {
- for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) {
- ChannelListener l = (ChannelListener)i.next();
- try {
- l.channelClosed(channel);
- }
- catch(Throwable t) {
- log.warn("channel listener failed", t);
- }
- }
- }
- }
-
- public void channelShunned() {
-
- synchronized(additionalChannelListeners) {
- for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) {
- ChannelListener l = (ChannelListener)i.next();
- try {
- l.channelShunned();
- }
- catch(Throwable t) {
- log.warn("channel listener failed", t);
- }
- }
- }
- }
-
- public void channelReconnected(Address new_addr) {
- if(log.isTraceEnabled())
- log.trace("channel has been rejoined, old local_addr=" + local_addr + ", new local_addr=" + new_addr);
- this.local_addr=new_addr;
- start();
-
- synchronized(additionalChannelListeners) {
- for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) {
- ChannelListener l = (ChannelListener)i.next();
- try {
- l.channelReconnected(new_addr);
- }
- catch(Throwable t) {
- log.warn("channel listener failed", t);
- }
- }
- }
- }
- /* ----------------------------------------------------------------------- */
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RspCollector.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RspCollector.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RspCollector.java
deleted file mode 100644
index 81d1541..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/RspCollector.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: RspCollector.java,v 1.2 2004/03/30 06:47:12 belaban Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.View;
-
-
-public interface RspCollector {
- void receiveResponse(Message msg);
- void suspect(Address mbr);
- void viewChange(View new_view);
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/TwoPhaseVotingAdapter.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/TwoPhaseVotingAdapter.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/TwoPhaseVotingAdapter.java
deleted file mode 100644
index e47d412..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/TwoPhaseVotingAdapter.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.ChannelException;
-
-/**
- * This adapter introduces simple two-phase voting on a specified decree. All
- * nodes in the group receive a decree in "prepare" phase where they expres
- * their opinion on the decree. If all nodes voted positively on decree, next
- * phase "commit" fixes changes that were made in "prepare" phase, otherwise
- * changes are canceled in "abort" phase.
- *
- * @author Roman Rokytskyy (rrokytskyy@acm.org)
- * @author Robert Schaffar-Taurok (robert@fusion.at)
- * @version $Id: TwoPhaseVotingAdapter.java,v 1.4 2005/06/08 15:56:54 publicnmi Exp $
- */
-public class TwoPhaseVotingAdapter {
-
- private final VotingAdapter voteChannel;
-
- /**
- * Creats an instance of the class.
- * @param voteChannel the channel that will be used for voting.
- */
- public TwoPhaseVotingAdapter(VotingAdapter voteChannel) {
- this.voteChannel = voteChannel;
- }
-
- /**
- * Wraps actual listener with the VoteChannelListener and adds to the
- * voteChannel
- */
- public void addListener(TwoPhaseVotingListener listener) {
- voteChannel.addVoteListener(new TwoPhaseVoteWrapper(listener));
- }
-
- /**
- * Removes the listener from the voteChannel
- */
- public void removeListener(TwoPhaseVotingListener listener) {
- voteChannel.removeVoteListener(new TwoPhaseVoteWrapper(listener));
- }
-
- /**
- * Performs the two-phase voting on the decree. After the voting each
- * group member remains in the same state as others.
- */
- public boolean vote(Object decree, long timeout) throws ChannelException {
- return vote(decree, timeout, null);
- }
-
- /**
- * Performs the two-phase voting on the decree. After the voting each
- * group member remains in the same state as others.
- */
- public boolean vote(Object decree, long timeout, VoteResponseProcessor voteResponseProcessor) throws ChannelException {
- // wrap real decree
- TwoPhaseWrapper wrappedDecree = new TwoPhaseWrapper(decree);
-
- // check the decree acceptance
- try {
- if (voteChannel.vote(wrappedDecree, timeout / 3, voteResponseProcessor)) {
- wrappedDecree.commit();
-
- // try to commit decree
- if (!voteChannel.vote(wrappedDecree, timeout / 3, voteResponseProcessor)) {
- // strange, should fail during prepare... abort all
- wrappedDecree.abort();
- voteChannel.vote(wrappedDecree, timeout / 3, voteResponseProcessor);
- return false;
- } else
- return true;
-
- } else {
- // somebody is not accepting the decree... abort
- wrappedDecree.abort();
- voteChannel.vote(wrappedDecree, timeout / 3, voteResponseProcessor);
- return false;
- }
- } catch(ChannelException chex) {
- wrappedDecree.abort();
- voteChannel.vote(wrappedDecree, timeout / 3, voteResponseProcessor);
- throw chex;
- }
- }
-
-
- /**
- * @return Returns the voteChannel.
- */
- public VotingAdapter getVoteChannel() {
- return voteChannel;
- }
-
- public static class TwoPhaseVoteWrapper implements VotingListener {
-
- private final TwoPhaseVotingListener listener;
-
- public TwoPhaseVoteWrapper(TwoPhaseVotingListener listener) {
- this.listener = listener;
- }
-
- public boolean vote(Object decree) throws VoteException {
- if (!(decree instanceof TwoPhaseWrapper))
- throw new VoteException("Not my type of decree. Ignore me.");
-
- TwoPhaseWrapper wrapper = (TwoPhaseWrapper)decree;
-
- // invoke the corresponding operation
- if (wrapper.isPrepare())
- return listener.prepare(wrapper.getDecree());
- else
- if (wrapper.isCommit())
- return listener.commit(wrapper.getDecree());
- else {
- listener.abort(wrapper.getDecree());
- return false;
- }
- }
-
- /*
-
- This wrapper is completely equal to the object it wraps.
-
- Therefore the hashCode():int and equals(Object):boolean are
- simply delegated to the wrapped code.
-
- */
-
- @Override // GemStoneAddition
- public int hashCode() { return listener.hashCode(); }
- @Override // GemStoneAddition
- public boolean equals(Object other) { return listener.equals(other); }
- }
-
- /**
- * Wrapper of the decree to voting decree.
- */
- public static class TwoPhaseWrapper implements java.io.Serializable {
- private static final long serialVersionUID = 413742420131273083L;
- private static final int PREPARE = 0;
- private static final int COMMIT = 1;
- private static final int ABORT = 2;
-
- public TwoPhaseWrapper(Object decree) {
- setDecree(decree);
- setType(PREPARE);
- }
-
- private Object decree;
- private int type;
-
- public Object getDecree(){ return decree; }
- public void setDecree(Object decree){ this.decree = decree; }
-
-// private int getType() { return type; } // GemStoneAddition
- private void setType(int type) { this.type = type; }
- private boolean isType(int type) { return this.type == type; }
-
- public boolean isPrepare() { return isType(PREPARE); }
- public boolean isCommit() { return isType(COMMIT); }
- public boolean isAbort() { return isType(ABORT); }
-
- public void commit() { setType(COMMIT); }
- public void abort() { setType(ABORT); }
-
- @Override // GemStoneAddition
- public String toString() { return decree.toString(); }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/TwoPhaseVotingListener.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/TwoPhaseVotingListener.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/TwoPhaseVotingListener.java
deleted file mode 100644
index 2566d23..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/TwoPhaseVotingListener.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-/**
- * Implementations of this interface can participate in two-phase voting process.
- *
- * @author Roman Rokytskyy (rrokytskyy@acm.org)
- */
-public interface TwoPhaseVotingListener {
- /**
- * This is voting if the decree is acceptable to the party.
- * @return <code>true</code> if the decree is acceptable.
- * @throws VoteException if the decree type is unknown or listener
- * does not want to vote on it.
- */
- boolean prepare(Object decree) throws VoteException;
-
- /**
- * This is voting on the commiting the decree.
- * @return <code>true</code> is the decree is commited.
- * @throws VoteException if the decree type is unknown or listener
- * does not want to vote on it.
- */
- boolean commit(Object decree) throws VoteException;
-
- /**
- * This is unconditional abort of the previous voting on the decree.
- * @throws VoteException if the listener ignores the abort.
- */
- void abort(Object decree) throws VoteException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/UpdateException.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/UpdateException.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/UpdateException.java
deleted file mode 100644
index 7081cf0..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/UpdateException.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: UpdateException.java,v 1.1.1.1 2003/09/09 01:24:08 belaban Exp $
-
-
-package com.gemstone.org.jgroups.blocks;
-
-
-
-public class UpdateException extends Exception {
-private static final long serialVersionUID = -4828148927982066195L;
-
- public UpdateException(String msg) {
- super(msg);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/VoteException.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/VoteException.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/VoteException.java
deleted file mode 100644
index 57d28ab..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/VoteException.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.ChannelException;
-
-/**
- * This exception is thrown when voting listener cannot vote on the
- * specified decree.
- *
- * @author Roman Rokytskyy (rrokytskyy@acm.org)
- */
-public class VoteException extends ChannelException {
-private static final long serialVersionUID = -741925330540432706L;
-
- public VoteException(String msg) { super(msg); }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/VoteResponseProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/VoteResponseProcessor.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/VoteResponseProcessor.java
deleted file mode 100644
index c3d79b3..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/VoteResponseProcessor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-
-import com.gemstone.org.jgroups.ChannelException;
-import com.gemstone.org.jgroups.util.RspList;
-
-
-/**
- * VoteResultProcessor
- * Applications that use the VotingAdapter and/or TwoPhaseVotingAdapter can pass an implementation of this down the vote
- * calls, to intercept processing of the VoteResults returned by other nodes.
- * See the source of {@link com.gemstone.org.jgroups.blocks.DistributedLockManager} for an example implementation.
- *
- * @author Robert Schaffar-Taurok (robert@fusion.at)
- * @version $Id: VoteResponseProcessor.java,v 1.2 2005/07/17 11:36:40 chrislott Exp $
- */
-public interface VoteResponseProcessor {
- /**
- * Processes the responses returned by the other nodes.
- * @param responses The responses
- * @param consensusType The consensusType of the vote
- * @param decree The vote decree
- * @return boolean
- * @throws ChannelException
- */
- public boolean processResponses(RspList responses, int consensusType, Object decree) throws ChannelException;
-}