You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:22:45 UTC
[21/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/STATS.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/STATS.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/STATS.java
deleted file mode 100644
index 35dccca..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/STATS.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.View;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-
-import java.util.*;
-
-/**
- * Provides various stats
- * @author Bela Ban
- * @version $Id: STATS.java,v 1.2 2005/06/07 10:17:27 belaban Exp $
- */
-public class STATS extends Protocol {
- long sent_msgs, sent_bytes, sent_ucasts, sent_mcasts, received_ucasts, received_mcasts;
- long received_msgs, received_bytes, sent_ucast_bytes, sent_mcast_bytes, received_ucast_bytes, received_mcast_bytes;
-
- /** HashMap key=Address, value=Entry, maintains stats per target destination */
- HashMap sent=new HashMap();
-
- /** HashMap key=Address, value=Entry, maintains stats per receiver */
- HashMap received=new HashMap();
-
- static/*GemStoneAddition*/ final short UP=1;
- static/*GemStoneAddition*/ final short DOWN=2;
-
-
- @Override // GemStoneAddition
- public String getName() {
- return "STATS";
- }
-
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- super.setProperties(props);
- down_thread=false; // never use a down thread
- up_thread=false; // never use an up thread
-
- if(props.size() > 0) {
- log.error(ExternalStrings.STATS_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
- return false;
- }
- return true;
- }
-
- @Override // GemStoneAddition
- public void resetStats() {
- sent_msgs=sent_bytes=sent_ucasts=sent_mcasts=received_ucasts=received_mcasts=0;
- received_msgs=received_bytes=sent_ucast_bytes=sent_mcast_bytes=received_ucast_bytes=received_mcast_bytes=0;
- sent.clear();
- received.clear();
- }
-
-
- public long getSentMessages() {return sent_msgs;}
- public long getSentBytes() {return sent_bytes;}
- public long getSentUnicastMessages() {return sent_ucasts;}
- public long getSentUnicastBytes() {return sent_ucast_bytes;}
- public long getSentMcastMessages() {return sent_mcasts;}
- public long getSentMcastBytes() {return sent_mcast_bytes;}
-
- public long getReceivedMessages() {return received_msgs;}
- public long getReceivedBytes() {return received_bytes;}
- public long getReceivedUnicastMessages() {return received_ucasts;}
- public long getReceivedUnicastBytes() {return received_ucast_bytes;}
- public long getReceivedMcastMessages() {return received_mcasts;}
- public long getReceivedMcastBytes() {return received_mcast_bytes;}
-
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- if(evt.getType() == Event.MSG) {
- Message msg=(Message)evt.getArg();
- updateStats(msg, UP);
- }
- else if(evt.getType() == Event.VIEW_CHANGE) {
- handleViewChange((View)evt.getArg());
- }
- passUp(evt);
- }
-
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
- if(evt.getType() == Event.MSG) {
- Message msg=(Message)evt.getArg();
- updateStats(msg, DOWN);
- }
- else if(evt.getType() == Event.VIEW_CHANGE) {
- handleViewChange((View)evt.getArg());
- }
- passDown(evt);
- }
-
-
- @Override // GemStoneAddition
- public String printStats() {
- Map.Entry entry;
- Object key, val;
- StringBuffer sb=new StringBuffer();
- sb.append("sent:\n");
- for(Iterator it=sent.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- key=entry.getKey();
- if(key == null) key="<mcast dest>";
- val=entry.getValue();
- sb.append(key).append(": ").append(val).append("\n");
- }
- sb.append("\nreceived:\n");
- for(Iterator it=received.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- key=entry.getKey();
- val=entry.getValue();
- sb.append(key).append(": ").append(val).append("\n");
- }
-
- return sb.toString();
- }
-
- private void handleViewChange(View view) {
- Vector members=view.getMembers();
- Set tmp=new LinkedHashSet(members);
- tmp.add(null); // for null destination (= mcast)
- sent.keySet().retainAll(tmp);
- received.keySet().retainAll(tmp);
- }
-
- private void updateStats(Message msg, short direction) {
- int length;
- HashMap map;
- boolean mcast;
- Address dest, src;
-
- if(msg == null) return;
- length=msg.getLength();
- dest=msg.getDest();
- src=msg.getSrc();
- mcast=dest == null || dest.isMulticastAddress();
-
- if(direction == UP) { // received
- received_msgs++;
- received_bytes+=length;
- if(mcast) {
- received_mcasts++;
- received_mcast_bytes+=length;
- }
- else {
- received_ucasts++;
- received_ucast_bytes+=length;
- }
- }
- else { // sent
- sent_msgs++;
- sent_bytes+=length;
- if(mcast) {
- sent_mcasts++;
- sent_mcast_bytes+=length;
- }
- else {
- sent_ucasts++;
- sent_ucast_bytes+=length;
- }
- }
-
- Address key=direction == UP? src : dest;
- map=direction == UP? received : sent;
- Entry entry=(Entry)map.get(key);
- if(entry == null) {
- entry=new Entry();
- map.put(key, entry);
- }
- entry.msgs++;
- entry.bytes+=length;
- if(mcast) {
- entry.mcasts++;
- entry.mcast_bytes+=length;
- }
- else {
- entry.ucasts++;
- entry.ucast_bytes+=length;
- }
- }
-
-
-
-
- static class Entry {
- long msgs, bytes, ucasts, mcasts, ucast_bytes, mcast_bytes;
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer sb=new StringBuffer();
- sb.append(msgs).append(" (").append(bytes).append(" bytes)");
- sb.append(": ").append(ucasts).append(" ucasts (").append(ucast_bytes).append(" bytes), ");
- sb.append(mcasts).append(" mcasts (").append(mcast_bytes).append(" bytes)");
- return sb.toString();
- }
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP.java
deleted file mode 100644
index cf50d36..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: TCP.java,v 1.31 2005/09/29 12:24:37 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.SuspectMember;
-import com.gemstone.org.jgroups.blocks.ConnectionTable;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.util.BoundedList;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Properties;
-import java.util.Vector;
-
-
-
-
-/**
- * TCP based protocol. Creates a server socket, which gives us the local address of this group member. For
- * each accept() on the server socket, a new thread is created that listens on the socket.
- * For each outgoing message m, if m.dest is in the ougoing hashtable, the associated socket will be reused
- * to send message, otherwise a new socket is created and put in the hashtable.
- * When a socket connection breaks or a member is removed from the group, the corresponding items in the
- * incoming and outgoing hashtables will be removed as well.<br>
- * This functionality is in ConnectionTable, which isT used by TCP. TCP sends messages using ct.send() and
- * registers with the connection table to receive all incoming messages.
- * @author Bela Ban
- */
-public class TCP extends TP implements ConnectionTable.Receiver {
- private ConnectionTable ct=null;
- private InetAddress external_addr=null; // the IP address which is broadcast to other group members
- private int start_port=7800; // find first available port starting at this port
- private int end_port=0; // maximum port to bind to
- private long reaper_interval=0; // time in msecs between connection reaps
- private long conn_expire_time=0; // max time a conn can be idle before being reaped
-
- /** List the maintains the currently suspected members. This is used so we don't send too many SUSPECT
- * events up the stack (one per message !)
- */
- final BoundedList suspected_mbrs=new BoundedList(20);
-
- /** Should we drop unicast messages to suspected members or not */
- boolean skip_suspected_members=true;
-
- /** Use separate send queues for each connection */
- boolean use_send_queues=true;
-
- int recv_buf_size=150000;
- int send_buf_size=150000;
- int sock_conn_timeout=2000; // max time in millis for a socket creation in ConnectionTable
-
-
-
- public TCP() {
- }
-
- @Override // GemStoneAddition
- public String getName() {
- return "TCP";
- }
-
-
- public int getOpenConnections() {return ct.getNumConnections();}
- public InetAddress getBindAddr() {return bind_addr;}
- public void setBindAddr(InetAddress bind_addr) {this.bind_addr=bind_addr;}
- public int getStartPort() {return start_port;}
- public void setStartPort(int start_port) {this.start_port=start_port;}
- public int getEndPort() {return end_port;}
- public void setEndPort(int end_port) {this.end_port=end_port;}
- public long getReaperInterval() {return reaper_interval;}
- public void setReaperInterval(long reaper_interval) {this.reaper_interval=reaper_interval;}
- public long getConnExpireTime() {return conn_expire_time;}
- public void setConnExpireTime(long conn_expire_time) {this.conn_expire_time=conn_expire_time;}
- @Override // GemStoneAddition
- public boolean isLoopback() {return loopback;}
- @Override // GemStoneAddition
- public void setLoopback(boolean loopback) {this.loopback=loopback;}
-
-
- public String printConnections() {return ct.toString();}
-
-
- /** Setup the Protocol instance acording to the configuration string */
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str;
-
- super.setProperties(props);
- str=props.getProperty("start_port");
- if(str != null) {
- start_port=Integer.parseInt(str);
- props.remove("start_port");
- }
-
- str=props.getProperty("end_port");
- if(str != null) {
- end_port=Integer.parseInt(str);
- props.remove("end_port");
- }
-
- str=props.getProperty("external_addr");
- if(str != null) {
- try {
- external_addr=InetAddress.getByName(str);
- }
- catch(UnknownHostException unknown) {
- if(log.isFatalEnabled()) log.fatal("(external_addr): host " + str + " not known");
- return false;
- }
- props.remove("external_addr");
- }
-
- str=props.getProperty("reaper_interval");
- if(str != null) {
- reaper_interval=Long.parseLong(str);
- props.remove("reaper_interval");
- }
-
- str=props.getProperty("conn_expire_time");
- if(str != null) {
- conn_expire_time=Long.parseLong(str);
- props.remove("conn_expire_time");
- }
-
- str=props.getProperty("sock_conn_timeout");
- if(str != null) {
- sock_conn_timeout=Integer.parseInt(str);
- props.remove("sock_conn_timeout");
- }
-
- str=props.getProperty("recv_buf_size");
- if(str != null) {
- recv_buf_size=Integer.parseInt(str);
- props.remove("recv_buf_size");
- }
-
- str=props.getProperty("send_buf_size");
- if(str != null) {
- send_buf_size=Integer.parseInt(str);
- props.remove("send_buf_size");
- }
-
- str=props.getProperty("skip_suspected_members");
- if(str != null) {
- skip_suspected_members=Boolean.valueOf(str).booleanValue();
- props.remove("skip_suspected_members");
- }
-
- str=props.getProperty("use_send_queues");
- if(str != null) {
- use_send_queues=Boolean.valueOf(str).booleanValue();
- props.remove("use_send_queues");
- }
-
- if(props.size() > 0) {
- log.error(ExternalStrings.TCP_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
- return false;
- }
- return true;
- }
-
-
- @Override // GemStoneAddition
- public void start() throws Exception {
- ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port);
- ct.setUseSendQueues(use_send_queues);
- // ct.addConnectionListener(this);
- ct.setReceiveBufferSize(recv_buf_size);
- ct.setSendBufferSize(send_buf_size);
- ct.setSocketConnectionTimeout(sock_conn_timeout);
- local_addr=ct.getLocalAddress();
- if(additional_data != null && local_addr instanceof IpAddress)
- ((IpAddress)local_addr).setAdditionalData(additional_data);
- super.start();
- }
-
- @Override // GemStoneAddition
- public void stop() {
- ct.stop();
- super.stop();
- }
-
-
- @Override // GemStoneAddition
- protected void handleDownEvent(Event evt) {
- super.handleDownEvent(evt);
- if(evt.getType() == Event.VIEW_CHANGE) {
- suspected_mbrs.removeAll();
- }
- else if(evt.getType() == Event.UNSUSPECT) {
- suspected_mbrs.removeElement(evt.getArg());
- }
- }
-
-
- /**
- * @param reaperInterval
- * @param connExpireTime
- * @param bindAddress
- * @param startPort
- * @throws Exception
- * @return ConnectionTable
- * Sub classes overrides this method to initialize a different version of
- * ConnectionTable.
- */
- protected ConnectionTable getConnectionTable(long reaperInterval, long connExpireTime, InetAddress bindAddress,
- InetAddress externalAddress, int startPort, int endPort) throws Exception {
- ConnectionTable cTable;
- if(reaperInterval == 0 && connExpireTime == 0) {
- cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort);
- }
- else {
- if(reaperInterval == 0) {
- reaperInterval=5000;
- if(warn) log.warn("reaper_interval was 0, set it to " + reaperInterval);
- }
- if(connExpireTime == 0) {
- connExpireTime=1000 * 60 * 5;
- if(warn) log.warn("conn_expire_time was 0, set it to " + connExpireTime);
- }
- cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort,
- reaperInterval, connExpireTime);
- }
- return cTable;
- }
-
-
- /** ConnectionTable.Receiver interface */
- public void receive(Address sender, byte[] data, int offset, int length) {
- super.receive(local_addr, sender, data, offset, length);
- }
-
-
-
-
- @Override // GemStoneAddition
- public void sendToAllMembers(byte[] data, int offset, int length) throws Exception {
- Address dest;
- Vector mbrs=(Vector)members.clone();
- for(int i=0; i < mbrs.size(); i++) {
- dest=(Address)mbrs.elementAt(i);
- sendToSingleMember(dest, false, data, offset, length);
- }
- }
-
- @Override // GemStoneAddition
- public void sendToSingleMember(Address dest, boolean isJoinResponse/*temporary change - do not commit*/, byte[] data, int offset, int length) throws Exception {
- if(trace) log.trace("dest=" + dest + " (" + data.length + " bytes)");
- if(skip_suspected_members) {
- if(suspected_mbrs.contains(dest)) {
- if(trace)
- log.trace("will not send unicast message to " + dest + " as it is currently suspected");
- return;
- }
- }
-
-// if(dest.equals(local_addr)) {
-// if(!loopback) // if loopback, we discard the message (was already looped back)
-// receive(dest, data, offset, length); // else we loop it back here
-// return;
-// }
- try {
- ct.send(dest, data, offset, length);
- }
- catch(Exception e) {
- if(members.contains(dest)) {
- if(!suspected_mbrs.contains(dest)) {
- suspected_mbrs.add(dest);
- passUp(new Event(Event.SUSPECT, new SuspectMember(local_addr, dest))); // GemStoneAddition SuspectMember
- }
- }
- }
- }
-
-
- @Override // GemStoneAddition
- public String getInfo() {
- StringBuffer sb=new StringBuffer();
- sb.append("connections: ").append(printConnections()).append("\n");
- return sb.toString();
- }
-
-
- @Override // GemStoneAddition
- public void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast) {
- if(multicast)
- msg.setDest(null);
- else
- msg.setDest(dest);
- }
-
- @Override // GemStoneAddition
- public void postUnmarshallingList(Message msg, Address dest, boolean multicast) {
- postUnmarshalling(msg, dest, null, multicast);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPGOSSIP.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPGOSSIP.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPGOSSIP.java
deleted file mode 100644
index 91650af..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPGOSSIP.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: TCPGOSSIP.java,v 1.16 2005/08/11 12:43:47 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Properties;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.Vector;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.JChannel;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.stack.GossipClient;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-
-
-/**
- * The TCPGOSSIP protocol layer retrieves the initial membership (used by the GMS when started
- * by sending event FIND_INITIAL_MBRS down the stack).
- * We do this by contacting one or more GossipServers, which must be running at well-known
- * addresses:ports. The responses should allow us to determine the coordinator whom we have to
- * contact, e.g. in case we want to join the group. When we are a server (after having
- * received the BECOME_SERVER event), we'll respond to TCPGOSSIP requests with a TCPGOSSIP
- * response.<p> The FIND_INITIAL_MBRS event will eventually be answered with a
- * FIND_INITIAL_MBRS_OK event up the stack.
- *
- * @author Bela Ban
- */
-public class TCPGOSSIP extends Discovery {
- Vector initial_hosts=null; // (list of IpAddresses) hosts to be contacted for the initial membership
- GossipClient gossip_client=null; // accesses the GossipServer(s) to find initial mbrship
-
- // we need to refresh the registration with the GossipServer(s) periodically,
- // so that our entries are not purged from the cache
- long gossip_refresh_rate=20000;
-
- private boolean splitBrainDetectionEnabled; // GemStoneAddition
- private int gossipServerWaitTime; // GemStoneAddition
-
- final static Vector EMPTY_VECTOR=new Vector();
- final static String name="TCPGOSSIP";
-
-
- @Override // GemStoneAddition
- public String getName() {
- return name;
- }
-
- // start GemStoneAddition
- @Override // GemStoneAddition
- public int getProtocolEnum() {
- return com.gemstone.org.jgroups.stack.Protocol.enumTCPGOSSIP;
- }
- // end GemStone addition
-
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str;
- str=props.getProperty("gossip_refresh_rate"); // wait for at most n members
- if(str != null) {
- gossip_refresh_rate=Integer.parseInt(str);
- props.remove("gossip_refresh_rate");
- }
-
- //GemStoneAddition - split-brain detection support
- str=props.getProperty("split-brain-detection");
- if (str != null) {
- splitBrainDetectionEnabled = Boolean.valueOf(str).booleanValue();
- props.remove("split-brain-detection");
- }
-
- str=props.getProperty("initial_hosts");
- if(str != null) {
- props.remove("initial_hosts");
- initial_hosts=createInitialHosts(str);
- }
-
- str = props.getProperty("gossip_server_wait_time");
- if (str != null) {
- props.remove("gossip_server_wait_time");
- gossipServerWaitTime = Integer.parseInt(str);
- }
-
- if(initial_hosts == null || initial_hosts.size() == 0) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.TCPGOSSIP_INITIAL_HOSTS_MUST_CONTAIN_THE_ADDRESS_OF_AT_LEAST_ONE_GOSSIPSERVER);
- return false;
- }
- return super.setProperties(props);
- }
-
-
-
- @Override // GemStoneAddition
- public void start() throws Exception {
- super.start();
- if(gossip_client == null) {
- gossip_client=new GossipClient(initial_hosts, gossip_refresh_rate, this.stack);
- gossip_client.setTimeout((int)this.timeout);
- }
- }
-
- @Override // GemStoneAddition
- public void stop() {
- super.stop();
- if(gossip_client != null) {
- gossip_client.stop();
- //gossip_client=null;
- }
- }
-
-
- @Override // GemStoneAddition
- public void handleConnectOK() {
- if(group_addr == null || local_addr == null) {
- if(log.isErrorEnabled())
- log.error("[CONNECT_OK]: group_addr or local_addr is null. " +
- "cannot register with GossipServer(s)");
- }
- else {
- gossip_client.register(group_addr, local_addr, timeout, true); // GemStone - timeout, stack & inhibit registration
- }
- }
-
-
- private boolean ipWarningIssued; // GemStoneAddition - IP version checking
-
- @Override // GemStoneAddition
- public void sendGetMembersRequest(AtomicBoolean waiter_sync) { // GemStoneAddition - both parameters
- Message msg, copy;
- PingHeader hdr;
- Vector tmp_mbrs;
- Address mbr_addr;
- GossipClient client = gossip_client; // GemStoneAddition - gossip_client gets nulled when this proto is stopped
-
- // bug #41484 - only use coordinator advice from the gossip server once
- boolean shortcutOK = !this.stack.hasTriedJoinShortcut();
-
- if(group_addr == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.TCPGOSSIP_FIND_INITIAL_MBRS_GROUP_ADDR_IS_NULL_CANNOT_GET_MBRSHIP);
- passUp(new Event(Event.FIND_INITIAL_MBRS_OK, EMPTY_VECTOR));
- return;
- }
- if(trace) log.trace("fetching members from GossipServer(s)");
-
- // GemStoneAddition - bug 28965: don't allow startup if no gossip server
- boolean isAdminOnly = stack.gfPeerFunctions.isAdminOnlyMember();
- //do { GemStone - see comment below
-
- if (gossip_client == null)
- return;
-
- long giveUpTime = System.currentTimeMillis() + (this.gossipServerWaitTime * 1000L);
-
- tmp_mbrs=client.getMembers(group_addr, local_addr, true, this.timeout); // GemStoneAddition - send local addr on get
-
- boolean firstWait = true;
- boolean startupStatusWaitingSet = false;
-
-// if (isAdminOnly) { // GemStoneAddition - this if-else block added
- while (gossip_client != null && client.getResponsiveServerCount() == 0 || tmp_mbrs == null || tmp_mbrs.size() == 0) {
- // Wait, until we can contact at least one of our
- // gossip servers and it had someone register with it
- if (!isAdminOnly && System.currentTimeMillis() >= giveUpTime) {
- break;
- }
- if (firstWait) {
- StringBuilder sb = new StringBuilder(100);
- for (Object obj: this.initial_hosts) {
- if (!firstWait) {
- sb.append(',');
- }
- firstWait = false;
- IpAddress addr = (IpAddress)obj;
- sb.append(addr.getIpAddress().getHostName())
- .append('[')
- .append(addr.getPort())
- .append(']');
- }
- // inform gfsh / ServerLauncher
- startupStatusWaitingSet = true;
- stack.gfPeerFunctions.logStartup(ExternalStrings.WAITING_FOR_LOCATOR_TO_START,sb.toString());
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- return; // GemStoneAddition
- }
- tmp_mbrs=client.getMembers(group_addr, local_addr, true, timeout);
- // GemStone Addition 08-04-04
- // if the VM is exiting, return so that the distributed system
- // sync can be released and the shutdown hook can do its job
- if (stack.gfPeerFunctions.shutdownHookIsAlive()) {
- throw stack.gfBasicFunctions.getGemFireConfigException("Unable to contact a Locator service before detecting that VM is exiting");
- }
- }
-// } else {
-// if (gossip_client == null) GemStoneAddition (this is never null)
-// return;
- if (client.getResponsiveServerCount() == 0) {
- RuntimeException re = stack.gfBasicFunctions.getGemFireConfigException("Unable to contact a Locator service. Operation either timed out or Locator does not exist. Configured list of locators is \"" + initial_hosts + "\".");
- throw re;
- }
-// }
-
- if (startupStatusWaitingSet) {
- stack.gfPeerFunctions.logStartup(ExternalStrings.WAITING_FOR_LOCATOR_TO_START_COMPLETED);
- }
- Set<Address> serverAddresses = client.getServerAddresses();
-
- if (client.getFloatingCoordinatorDisabled()) {
- passUp(new Event(Event.FLOATING_COORDINATOR_DISABLED, null));
- }
-
- if (client.getNetworkPartitionDetectionEnabled() != splitBrainDetectionEnabled) {
- if (!splitBrainDetectionEnabled) {
- splitBrainDetectionEnabled = true;
- passUp(new Event(Event.ENABLE_NETWORK_PARTITION_DETECTION));
- } else {
- throw stack.gfBasicFunctions.getGemFireConfigException("Locator has enable-network-partition-detection="
- + client.getNetworkPartitionDetectionEnabled()
- +" but this member has enable-network-partition-detection="
- + splitBrainDetectionEnabled);
- }
- }
-
- if (client.getNetworkPartitionDetectionEnabled()) {
- stack.gfBasicFunctions.checkDisableDNS();
- }
-
-
- // GemStoneAddition for bug 39220 see if we're using an incompatible
- // version of IP
- if (tmp_mbrs != null && !ipWarningIssued) {
- TP protocol = (TP)stack.findProtocol("UDP");
- if (protocol == null) protocol = (TP)stack.findProtocol("TCP");
- InetAddress bindAddress = protocol.getInetBindAddress();
- if (bindAddress != null) {
- boolean iAmIPv4 = (bindAddress instanceof Inet4Address);
- for (int i=0; i<tmp_mbrs.size(); i++) {
- IpAddress addr = (IpAddress)tmp_mbrs.get(i);
- InetAddress iaddr = addr.getIpAddress();
- if (iAmIPv4 != (iaddr instanceof Inet4Address)) {
- // incompatible addresses are being used
- log.getLogWriter().warning(
- ExternalStrings.TCPGOSSIP_IP_VERSION_MISMATCH);
- ipWarningIssued = true;
- break;
- }
- }
- }
- }
-
- serverAddresses.remove(this.local_addr);
- this.ping_waiter.setRequiredResponses(serverAddresses);
-
- // GemStoneAddition - if no locators have distributed systems,
- // tell the GMS that it's okay for it to become a coordinator
- if (client.getServerDistributedSystemCount() == 0) {
- passUp(new Event(Event.ENABLE_INITIAL_COORDINATOR, null));
- }
-
- // GemStoneAddition - shortcut the get_mbrs phase
- if (shortcutOK) {
- Address coordinator = client.getCoordinator();
- // if this is a Locator starting up and there are no other processes
- // in the system we can bypass discovery
-
- // disabled: this allows a locator that's starting up to ignore concurrently
- // starting locators. bug #30341 is fixed by requiring responses from
- // all known locators during discovery, and this code messes that up
-// if (coordinator == null && Locator.hasLocators()
-// && tmp_mbrs.size() == 0
-// || (tmp_mbrs.size() == 1 && tmp_mbrs.get(0).equals(this.local_addr))) {
-// coordinator = this.local_addr;
-// }
- if (coordinator != null) {
- if (log.getLogWriter().fineEnabled()) {
- log.getLogWriter().fine("Locator returned coordinator " + coordinator +
- ", so bypassing unicast discovery processing");
- }
- ping_waiter.setCoordinator(coordinator);
- wakeWaiter(waiter_sync);
- return;
- }
- }
-
- if(tmp_mbrs == null || tmp_mbrs.size() == 0) {
- if(trace) log.trace("[FIND_INITIAL_MBRS]: gossip client found no members");
- passUp(new Event(Event.FIND_INITIAL_MBRS_OK, EMPTY_VECTOR));
- wakeWaiter(waiter_sync); // GemStoneAddition
- return;
- }
- if(trace) {
- log.trace("consolidated mbrs from GossipServer(s) are " + tmp_mbrs
- + ". Locator distributed system count=" + client.getServerDistributedSystemCount()
- + ", and floatingCoordinationDisabled="+client.getFloatingCoordinatorDisabled());
- }
-
- // GemStoneAddition - forces us to not get any initial member responses & tests the
- // disable_initial_coordinator setting
- //if (true) {
- // log.info("DEBUG: not sending GET_MBRS_REQ message to list returned by gossip server");
- // return;
- //}
-
- // 1. 'Mcast' GET_MBRS_REQ message
- hdr=new PingHeader(PingHeader.GET_MBRS_REQ, null);
- msg=new Message(null, null, null);
- msg.putHeader(name, hdr);
- //GemStoneAddition - don't bundle this message or we might time out
- // before it's even sent
- msg.bundleable = false;
-
- wakeWaiter(waiter_sync); // GemStoneAddition
-
- // GemStoneAddition - here we send the request to newer members first
- // since they're likely to be around.
- int max_msgs = Integer.getInteger("gemfire.max_ping_requests", 40).intValue();
- int msgs_sent = 0;
- for(int i=tmp_mbrs.size()-1; i >= 0; i--) {
- mbr_addr=(Address)tmp_mbrs.elementAt(i);
- // make sure all required responders get the message
- if (!serverAddresses.contains(mbr_addr) && (msgs_sent >= max_msgs)) {
- continue;
- }
- copy=msg.copy();
- copy.setDest(mbr_addr);
- if(trace) log.trace("[FIND_INITIAL_MBRS] sending PING request to " + copy.getDest());
- passDown(new Event(Event.MSG, copy));
- if (Thread.currentThread().isInterrupted()) {
- break;
- }
- msgs_sent++;
- }
-
- // GemStoneAddition - not really an addition, just a note from Bruce
- // that this used to have a wait-for-initial-members section that is
- // now gone, making the loop a bit difficult to implement
- //} while (isAdminOnly && initial_members.size() <= 0);
-
- }
-
-
-
- /* -------------------------- Private methods ---------------------------- */
-
-
- /**
- * Input is "daddy[8880],sindhu[8880],camille[5555]. Return list of IpAddresses
- */
- public static Vector createInitialHosts(String l) {
- Vector tmp=new Vector();
- String host;
- int port;
- IpAddress addr;
- StringTokenizer tok=new StringTokenizer(l, ",");
- String t;
- boolean isLoopback = false;
- InetAddress myAddress = null;
-
- String bindAddress = System.getProperty("gemfire.jg-bind-address");
- try {
- if (bindAddress == null) {
- isLoopback = JChannel.getGfFunctions().getLocalHost().isLoopbackAddress();
- } else {
- isLoopback = InetAddress.getByName(bindAddress).isLoopbackAddress();
- }
- } catch (UnknownHostException e) {
- // ignore
- }
-
-
-
- while(tok.hasMoreTokens()) {
- try {
- t=tok.nextToken();
- host=t.substring(0, t.indexOf('['));
- // GemStoneAddition - support for name:bind-addr[port] format
- int idx = host.lastIndexOf('@');
- if (idx < 0) {
- idx = host.lastIndexOf(':');
- }
- String h = host.substring(0, idx > -1 ? idx : host.length());
- if (h.indexOf(':') >= 0) { // a single numeric ipv6 address
- idx = host.lastIndexOf('@');
- }
- if (idx >= 0) {
- host = host.substring(idx+1, host.length());
- }
- port=Integer.parseInt(t.substring(t.indexOf('[') + 1, t.indexOf(']')));
- addr=new IpAddress(host, port);
- if (isLoopback && !addr.getIpAddress().isLoopbackAddress()) { // GemStoneAddition
- // TODO this should be a GemFireConfigException but that class isn't available
- // in a static method in the jgroups project
- throw new RuntimeException("This process is attempting to join with a loopback address ("+myAddress+") using a locator that does not have a local address ("+addr.getIpAddress()+"). On Unix this usually means that /etc/hosts is misconfigured.");
- }
- tmp.addElement(addr);
- }
- catch(NumberFormatException e) {
- //if(log.isErrorEnabled()) log.error(JGroupsStrings.TCPGOSSIP_EXEPTION_IS__0, e);
- }
- }
-
- return tmp;
- }
-
- @Override // GemStoneAddition
- public void destroy() { // GemStoneAddition - get rid of gossip timer
- if (gossip_client != null) {
- gossip_client.destroy();
- gossip_client = null;
- }
- }
-
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPPING.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPPING.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPPING.java
deleted file mode 100644
index 6459d49..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPPING.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: TCPPING.java,v 1.24 2005/08/11 12:43:47 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.StringTokenizer;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-
-
-/**
- * The TCPPING protocol layer retrieves the initial membership in answer to the GMS's
- * FIND_INITIAL_MBRS event. The initial membership is retrieved by directly contacting other group
- * members, sending point-to-point mebership requests. The responses should allow us to determine
- * the coordinator whom we have to contact in case we want to join the group. When we are a server
- * (after having received the BECOME_SERVER event), we'll respond to TCPPING requests with a TCPPING
- * response.
- * <p>
- * The FIND_INITIAL_MBRS event will eventually be answered with a FIND_INITIAL_MBRS_OK event up
- * the stack.
- * <p>
- * The TCPPING protocol requires a static conifiguration, which assumes that you to know in advance
- * where to find other members of your group. For dynamic discovery, use the PING protocol, which
- * uses multicast discovery, or the TCPGOSSIP protocol, which contacts a Gossip Router to acquire
- * the initial membership.
- *
- * @author Bela Ban
- */
-public class TCPPING extends Discovery {
- int port_range=1; // number of ports to be probed for initial membership
-
- /** List of IpAddress */
- ArrayList initial_hosts=null; // hosts to be contacted for the initial membership
- final static String name="TCPPING";
-
-
-
- @Override // GemStoneAddition
- public String getName() {
- return name;
- }
-
-
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str;
-
- str=props.getProperty("port_range"); // if member cannot be contacted on base port,
- if(str != null) { // how many times can we increment the port
- port_range=Integer.parseInt(str);
- if (port_range < 1) {
- port_range = 1;
- }
- props.remove("port_range");
- }
-
- str=props.getProperty("initial_hosts");
- if(str != null) {
- props.remove("initial_hosts");
- initial_hosts=createInitialHosts(str);
- }
-
- return super.setProperties(props);
- }
-
-
- @Override // GemStoneAddition
- public void localAddressSet(Address addr) {
- // Add own address to initial_hosts if not present: we must always be able to ping ourself !
- if(initial_hosts != null && addr != null) {
- if(initial_hosts.contains(addr)) {
- initial_hosts.remove(addr);
- if(log.isDebugEnabled()) log.debug("[SET_LOCAL_ADDRESS]: removing my own address (" + addr +
- ") from initial_hosts; initial_hosts=" + initial_hosts);
- }
- }
- }
-
-
- @Override // GemStoneAddition
- public void sendGetMembersRequest(AtomicBoolean waiter_sync) {
- Message msg;
-
- wakeWaiter(waiter_sync);
-
- for(Iterator it=initial_hosts.iterator(); it.hasNext();) {
- Address addr=(Address)it.next();
- // if(tmpMbrs.contains(addr)) {
- // ; // continue; // changed as suggested by Mark Kopec
- // }
- msg=new Message(addr, null, null);
- msg.putHeader(name, new PingHeader(PingHeader.GET_MBRS_REQ, null));
-
- if(trace) log.trace("[FIND_INITIAL_MBRS] sending PING request to " + msg.getDest());
- passDown(new Event(Event.MSG, msg));
- }
- }
-
-
-
- /* -------------------------- Private methods ---------------------------- */
-
- /**
- * Input is "daddy[8880],sindhu[8880],camille[5555]. Return List of IpAddresses
- */
- private ArrayList createInitialHosts(String l) {
- StringTokenizer tok=new StringTokenizer(l, ",");
- String t;
- IpAddress addr;
- ArrayList retval=new ArrayList();
-
- while(tok.hasMoreTokens()) {
- try {
- t=tok.nextToken();
- String host=t.substring(0, t.indexOf('['));
- int port=Integer.parseInt(t.substring(t.indexOf('[') + 1, t.indexOf(']')));
- for(int i=port; i < port + port_range; i++) {
- addr=new IpAddress(host, i);
- retval.add(addr);
- }
- }
- catch(NumberFormatException e) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.TCPPING_EXEPTION_IS__0, e);
- }
- }
-
- return retval;
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP_NIO.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP_NIO.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP_NIO.java
deleted file mode 100644
index 3c9ba7b..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP_NIO.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.protocols;
-
-import com.gemstone.org.jgroups.blocks.ConnectionTable;
-import com.gemstone.org.jgroups.blocks.ConnectionTableNIO;
-
-import java.net.InetAddress;
-import java.util.Properties;
-
-public class TCP_NIO extends TCP
- {
-
- /*
- * (non-Javadoc)
- *
- * @see org.jgroups.protocols.TCP#getConnectionTable(long, long)
- */
- @Override // GemStoneAddition
- protected ConnectionTable getConnectionTable(long ri, long cet,
- InetAddress b_addr, InetAddress bc_addr, int s_port, int e_port) throws Exception {
- ConnectionTableNIO ct = null;
- if (ri == 0 && cet == 0) {
- ct = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port );
- } else {
- if (ri == 0) {
- ri = 5000;
- if(warn) log.warn("reaper_interval was 0, set it to "
- + ri);
- }
- if (cet == 0) {
- cet = 1000 * 60 * 5;
- if(warn) log.warn("conn_expire_time was 0, set it to "
- + cet);
- }
- ct = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, ri, cet);
- }
- return ct;
- }
-
- @Override // GemStoneAddition
- public String getName() {
- return "TCP_NIO";
- }
-
- public int getReaderThreads() { return m_reader_threads; }
- public int getWriterThreads() { return m_writer_threads; }
- public int getProcessorThreads() { return m_processor_threads; }
- public int getProcessorMinThreads() { return m_processor_minThreads;}
- public int getProcessorMaxThreads() { return m_processor_maxThreads;}
- public int getProcessorQueueSize() { return m_processor_queueSize; }
- public int getProcessorKeepAliveTime() { return m_processor_keepAliveTime; }
-
- /** Setup the Protocol instance acording to the configuration string */
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str;
-
- str=props.getProperty("reader_threads");
- if(str != null) {
- m_reader_threads=Integer.parseInt(str);
- props.remove("reader_threads");
- }
-
- str=props.getProperty("writer_threads");
- if(str != null) {
- m_writer_threads=Integer.parseInt(str);
- props.remove("writer_threads");
- }
-
- str=props.getProperty("processor_threads");
- if(str != null) {
- m_processor_threads=Integer.parseInt(str);
- props.remove("processor_threads");
- }
-
- str=props.getProperty("processor_minThreads");
- if(str != null) {
- m_processor_minThreads=Integer.parseInt(str);
- props.remove("processor_minThreads");
- }
-
- str=props.getProperty("processor_maxThreads");
- if(str != null) {
- m_processor_maxThreads =Integer.parseInt(str);
- props.remove("processor_maxThreads");
- }
-
- str=props.getProperty("processor_queueSize");
- if(str != null) {
- m_processor_queueSize=Integer.parseInt(str);
- props.remove("processor_queueSize");
- }
-
- str=props.getProperty("processor_keepAliveTime");
- if(str != null) {
- m_processor_keepAliveTime=Integer.parseInt(str);
- props.remove("processor_keepAliveTime");
- }
-
- return super.setProperties(props);
- }
-
- private int m_reader_threads = 8;
-
- private int m_writer_threads = 8;
-
- private int m_processor_threads = 10; // PooledExecutor.createThreads()
- private int m_processor_minThreads = 10; // PooledExecutor.setMinimumPoolSize()
- private int m_processor_maxThreads = 10; // PooledExecutor.setMaxThreads()
- private int m_processor_queueSize=100; // Number of queued requests that can be pending waiting
- // for a background thread to run the request.
- private int m_processor_keepAliveTime = -1; // PooledExecutor.setKeepAliveTime( milliseconds);
- // A negative value means to wait forever
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TOTAL.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TOTAL.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TOTAL.java
deleted file mode 100644
index 2622a29..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TOTAL.java
+++ /dev/null
@@ -1,1055 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: TOTAL.java,v 1.11 2005/08/08 12:45:44 belaban Exp $
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.oswego.concurrent.ReadWriteLock;
-import com.gemstone.org.jgroups.oswego.concurrent.WriterPreferenceReadWriteLock;
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.View;
-import com.gemstone.org.jgroups.stack.AckSenderWindow;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.TimeScheduler;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.*;
-
-
-/**
- * Implements the total ordering layer using a message sequencer
- * <p/>
- * <p/>
- * The protocol guarantees that all bcast sent messages will be delivered in
- * the same order to all members. For that it uses a sequencer which assignes
- * monotonically increasing sequence ID to broadcasts. Then all group members
- * deliver the bcasts in ascending sequence ID order.
- * <p/>
- * <ul>
- * <li>
- * When a bcast message comes down to this layer, it is placed in the pending
- * down queue. A bcast request is sent to the sequencer.</li>
- * <li>
- * When the sequencer receives a bcast request, it creates a bcast reply
- * message and assigns to it a monotonically increasing seqID and sends it back
- * to the source of the bcast request.</li>
- * <li>
- * When a broadcast reply is received, the corresponding bcast message is
- * assigned the received seqID. Then it is broadcasted.</li>
- * <li>
- * Received bcasts are placed in the up queue. The queue is sorted according
- * to the seqID of the bcast. Any message at the head of the up queue with a
- * seqID equal to the next expected seqID is delivered to the layer above.</li>
- * <li>
- * Unicast messages coming from the layer below are forwarded above.</li>
- * <li>
- * Unicast messages coming from the layer above are forwarded below.</li>
- * </ul>
- * <p/>
- * <i>Please note that once a <code>BLOCK_OK</code> is acknowledged messages
- * coming from above are discarded!</i> Either the application must stop
- * sending messages when a <code>BLOCK</code> event is received from the
- * channel or a QUEUE layer should be placed above this one. Received messages
- * are still delivered above though.
- * <p/>
- * bcast requests are retransmitted periodically until a bcast reply is
- * received. In case a BCAST_REP is on its way during a BCAST_REQ
- * retransmission, then the next BCAST_REP will be to a non-existing
- * BCAST_REQ. So, a nulll BCAST message is sent to fill the created gap in
- * the seqID of all members.
- *
- * @author i.georgiadis@doc.ic.ac.uk
- */
-public class TOTAL extends Protocol {
- /**
- * The header processed by the TOTAL layer and intended for TOTAL
- * inter-stack communication
- */
- public static class Header extends com.gemstone.org.jgroups.Header {
- // Header types
- /**
- * Null value for the tag
- */
- public static final int NULL_TYPE=-1;
- /**
- * Request to broadcast by the source
- */
- public static final int REQ=0;
- /**
- * Reply to broadcast request.
- */
- public static final int REP=1;
- /**
- * Unicast message
- */
- public static final int UCAST=2;
- /**
- * Broadcast Message
- */
- public static final int BCAST=3;
-
- /**
- * The header's type tag
- */
- public int type;
- /**
- * The ID used by the message source to match replies from the
- * sequencer
- */
- public long localSequenceID;
- /**
- * The ID imposing the total order of messages
- */
- public long sequenceID;
-
- /**
- * used for externalization
- */
- public Header() {
- }
-
- /**
- * Create a header for the TOTAL layer
- *
- * @param type the header's type
- * @param localSeqID the ID used by the sender of broadcasts to match
- * requests with replies from the sequencer
- * @param seqID the ID imposing the total order of messages
- * @throws IllegalArgumentException if the provided header type is
- * unknown
- */
- public Header(int type, long localSeqID, long seqID) {
- super();
- switch(type) {
- case REQ:
- case REP:
- case UCAST:
- case BCAST:
- this.type=type;
- break;
- default:
- this.type=NULL_TYPE;
- throw new IllegalArgumentException("type");
- }
- this.localSequenceID=localSeqID;
- this.sequenceID=seqID;
- }
-
- /**
- * For debugging purposes
- */
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer buffer=new StringBuffer();
- String typeName;
- buffer.append("[TOTAL.Header");
- switch(type) {
- case REQ:
- typeName="REQ";
- break;
- case REP:
- typeName="REP";
- break;
- case UCAST:
- typeName="UCAST";
- break;
- case BCAST:
- typeName="BCAST";
- break;
- case NULL_TYPE:
- typeName="NULL_TYPE";
- break;
- default:
- typeName="";
- break;
- }
- buffer.append(", type=" + typeName);
- buffer.append(", " + "localID=" + localSequenceID);
- buffer.append(", " + "seqID=" + sequenceID);
- buffer.append(']');
-
- return (buffer.toString());
- }
-
- /**
- * Manual serialization
- */
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(type);
- out.writeLong(localSequenceID);
- out.writeLong(sequenceID);
- }
-
- /**
- * Manual deserialization
- */
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- type=in.readInt();
- localSequenceID=in.readLong();
- sequenceID=in.readLong();
- }
- }
-
-
- /**
- * The retransmission listener - It is called by the
- * <code>AckSenderWindow</code> when a retransmission should occur
- */
- private class Command implements AckSenderWindow.RetransmitCommand {
- Command() {
- }
-
- public void retransmit(long seqNo, Message msg) {
- _retransmitBcastRequest(seqNo);
- }
- // GemstoneAddition
- public long getMaxRetransmissionBurst() {
- return 0;
- }
- }
-
-
- /**
- * Protocol name
- */
- private static final String PROT_NAME="TOTAL";
- /**
- * Property names
- */
- private static final String TRACE_PROP="trace";
-
- /**
- * Average time between broadcast request retransmissions
- */
- private final long[] AVG_RETRANSMIT_INTERVAL=new long[]{1000, 2000, 3000, 4000};
-
- /**
- * Null value for the IDs
- */
- private static final long NULL_ID=-1;
- // Layer sending states
- /**
- * No group has been joined yet
- */
- private static final int NULL_STATE=-1;
- /**
- * When set, all messages are sent/received
- */
- private static final int RUN=0;
- /**
- * When set, only session-specific messages are sent/received, i.e. only
- * messages essential to the session's integrity
- */
- private static final int FLUSH=1;
- /**
- * No message is sent to the layer below
- */
- private static final int BLOCK=2;
-
-
- /**
- * The state lock allowing multiple reads or a single write
- */
- private final ReadWriteLock stateLock=new WriterPreferenceReadWriteLock();
- /**
- * Protocol layer message-sending state
- */
- private int state=NULL_STATE;
- /**
- * The address of this stack
- */
- private Address addr=null;
- /**
- * The address of the sequencer
- */
- private Address sequencerAddr=null;
- /**
- * The sequencer's seq ID. The ID of the most recently broadcast reply
- * message
- */
- private long sequencerSeqID=NULL_ID;
- /**
- * The local sequence ID, i.e. the ID sent with the last broadcast request
- * message. This is increased with every broadcast request sent to the
- * sequencer and it's used to match the requests with the sequencer's
- * replies
- */
- private long localSeqID=NULL_ID;
- /**
- * The total order sequence ID. This is the ID of the most recently
- * delivered broadcast message. As the sequence IDs are increasing without
- * gaps, this is used to detect missing broadcast messages
- */
- private long seqID=NULL_ID;
- /**
- * The list of unanswered broadcast requests to the sequencer. The entries
- * are stored in increasing local sequence ID, i.e. in the order they were
- * <p/>
- * sent localSeqID -> Broadcast msg to be sent.
- */
- private SortedMap reqTbl;
- /**
- * The list of received broadcast messages that haven't yet been delivered
- * to the layer above. The entries are stored in increasing sequence ID,
- * i.e. in the order they must be delivered above
- * <p/>
- * seqID -> Received broadcast msg
- */
- private SortedMap upTbl;
- /**
- * Retranmitter for pending broadcast requests
- */
- private AckSenderWindow retransmitter;
-
-
- /**
- * Print addresses in host_ip:port form to bypass DNS
- */
- private String _addrToString(Object addr) {
- return (
- addr == null ? "<null>" :
- ((addr instanceof com.gemstone.org.jgroups.stack.IpAddress) ?
- (((com.gemstone.org.jgroups.stack.IpAddress)addr).getIpAddress(
- ).getHostAddress() + ':' +
- ((com.gemstone.org.jgroups.stack.IpAddress)addr).getPort()) :
- addr.toString())
- );
- }
-
-
- /**
- * @return this protocol's name
- */
- private String _getName() {
- return (PROT_NAME);
- }
-
- /**
- * Configure the protocol based on the given list of properties
- *
- * @param properties the list of properties to use to setup this layer
- * @return false if there was any unrecognized property or a property with
- * an invalid value
- */
- private boolean _setProperties(Properties properties) {
- String value;
-
- // trace
- // Parse & remove property but ignore it; use Trace.trace instead
- value=properties.getProperty(TRACE_PROP);
- if(value != null) properties.remove(TRACE_PROP);
- if(properties.size() > 0) {
- if(log.isErrorEnabled())
- log.error("The following properties are not " +
- "recognized: " + properties.toString());
- return (false);
- }
- return (true);
- }
-
- /**
- * Events that some layer below must handle
- *
- * @return the set of <code>Event</code>s that must be handled by some layer
- * below
- */
- Vector _requiredDownServices() {
- Vector services=new Vector();
-
- return (services);
- }
-
- /**
- * Events that some layer above must handle
- *
- * @return the set of <code>Event</code>s that must be handled by some
- * layer above
- */
- Vector _requiredUpServices() {
- Vector services=new Vector();
-
- return (services);
- }
-
-
- /**
- * Extract as many messages as possible from the pending up queue and send
- * them to the layer above
- */
- private void _deliverBcast() {
- Message msg;
- Header header;
-
- synchronized(upTbl) {
- while((msg=(Message)upTbl.remove(Long.valueOf(seqID + 1))) != null) {
- header=(Header)msg.removeHeader(getName());
- if(header.localSequenceID != NULL_ID) passUp(new Event(Event.MSG, msg));
- ++seqID;
- }
- } // synchronized(upTbl)
- }
-
-
- /**
- * Add all undelivered bcasts sent by this member in the req queue and then
- * replay this queue
- */
- private void _replayBcast() {
- Iterator it;
- Message msg;
- Header header;
-
- // i. Remove all undelivered bcasts sent by this member and place them
- // again in the pending bcast req queue
-
- synchronized(upTbl) {
- if(upTbl.size() > 0)
- if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_REPLAYING_UNDELIVERED_BCASTS);
-
- it=upTbl.entrySet().iterator();
- while(it.hasNext()) {
- msg=(Message)((Map.Entry)it.next()).getValue();
- it.remove();
- if(!msg.getSrc().equals(addr)) {
- if(log.isInfoEnabled())
- log.info("During replay: " +
- "discarding BCAST[" +
- ((TOTAL.Header)msg.getHeader(getName())).sequenceID +
- "] from " + _addrToString(msg.getSrc()));
- continue;
- }
- header=(Header)msg.removeHeader(getName());
- if(header.localSequenceID == NULL_ID) continue;
- _sendBcastRequest(msg, header.localSequenceID);
- }
- } // synchronized(upTbl)
- }
-
-
- /**
- * Send a unicast message: Add a <code>UCAST</code> header
- *
- * @param msg the message to unicast
- * @return the message to send
- */
- private Message _sendUcast(Message msg) {
- msg.putHeader(getName(), new Header(Header.UCAST, NULL_ID, NULL_ID));
- return (msg);
- }
-
-
- /**
- * Replace the original message with a broadcast request sent to the
- * sequencer. The original bcast message is stored locally until a reply to
- * bcast is received from the sequencer. This function has the side-effect
- * of increasing the <code>localSeqID</code>
- *
- * @param msg the message to broadcast
- */
- private void _sendBcastRequest(Message msg) {
- _sendBcastRequest(msg, ++localSeqID);
- }
-
-
- /**
- * Replace the original message with a broadcast request sent to the
- * sequencer. The original bcast message is stored locally until a reply
- * to bcast is received from the sequencer
- *
- * @param msg the message to broadcast
- * @param id the local sequence ID to use
- */
- private void _sendBcastRequest(Message msg, long id) {
-
- // i. Store away the message while waiting for the sequencer's reply
- // ii. Send a bcast request immediatelly and also schedule a
- // retransmission
- synchronized(reqTbl) {
- reqTbl.put(Long.valueOf(id), msg);
- }
- _transmitBcastRequest(id);
- retransmitter.add(id, msg);
- }
-
-
- /**
- * Send the bcast request with the given localSeqID
- *
- * @param seqID the local sequence id of the
- */
- private void _transmitBcastRequest(long seqID) {
- Message reqMsg;
-
- // i. If NULL_STATE, then ignore, just transient state before
- // shutting down the retransmission thread
- // ii. If blocked, be patient - reschedule
- // iii. If the request is not pending any more, acknowledge it
- // iv. Create a broadcast request and send it to the sequencer
-
- if(state == NULL_STATE) {
- if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_TRANSMIT_BCAST_REQ_0__IN_NULL_STATE, seqID);
- return;
- }
- if(state == BLOCK) return;
-
- synchronized(reqTbl) {
- if(!reqTbl.containsKey(Long.valueOf(seqID))) {
- retransmitter.ack(seqID);
- return;
- }
- }
- reqMsg=new Message(sequencerAddr, addr, new byte[0]);
- reqMsg.putHeader(getName(), new Header(Header.REQ, seqID, NULL_ID));
-
- passDown(new Event(Event.MSG, reqMsg));
- }
-
-
- /**
- * Receive a unicast message: Remove the <code>UCAST</code> header
- *
- * @param msg the received unicast message
- */
- private void _recvUcast(Message msg) {
- msg.removeHeader(getName());
- }
-
- /**
- * Receive a broadcast message: Put it in the pending up queue and then
- * try to deliver above as many messages as possible
- *
- * @param msg the received broadcast message
- */
- private void _recvBcast(Message msg) {
- Header header=(Header)msg.getHeader(getName());
-
- // i. Put the message in the up pending queue only if it's not
- // already there, as it seems that the event may be received
- // multiple times before a view change when all members are
- // negotiating a common set of stable msgs
- //
- // ii. Deliver as many messages as possible
-
- synchronized(upTbl) {
- if(header.sequenceID <= seqID)
- return;
- upTbl.put(Long.valueOf(header.sequenceID), msg);
- }
-
- _deliverBcast();
- }
-
-
- /**
- * Received a bcast request - Ignore if not the sequencer, else send a
- * bcast reply
- *
- * @param msg the broadcast request message
- */
- private void _recvBcastRequest(Message msg) {
- Header header;
- Message repMsg;
-
- // i. If blocked, discard the bcast request
- // ii. Assign a seqID to the message and send it back to the requestor
-
- if(!addr.equals(sequencerAddr)) {
- if(log.isErrorEnabled())
- log.error("Received bcast request " +
- "but not a sequencer");
- return;
- }
- if(state == BLOCK) {
- if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_BLOCKED_DISCARD_BCAST_REQ);
- return;
- }
- header=(Header)msg.getHeader(getName());
- ++sequencerSeqID;
- repMsg=new Message(msg.getSrc(), addr, new byte[0]);
- repMsg.putHeader(getName(), new Header(Header.REP, header.localSequenceID,
- sequencerSeqID));
-
- passDown(new Event(Event.MSG, repMsg));
- }
-
-
- /**
- * Received a bcast reply - Match with the pending bcast request and move
- * the message in the list of messages to be delivered above
- *
- * @param header the header of the bcast reply
- */
- private void _recvBcastReply(Header header) {
- Message msg;
- long id;
-
- // i. If blocked, discard the bcast reply
- //
- // ii. Assign the received seqID to the message and broadcast it
- //
- // iii.
- // - Acknowledge the message to the retransmitter
- // - If non-existent BCAST_REQ, send a fake bcast to avoid seqID gaps
- // - If localID == NULL_ID, it's a null BCAST, else normal BCAST
- // - Set the seq ID of the message to the one sent by the sequencer
-
- if(state == BLOCK) {
- if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_BLOCKED_DISCARD_BCAST_REP);
- return;
- }
-
- synchronized(reqTbl) {
- msg=(Message)reqTbl.remove(Long.valueOf(header.localSequenceID));
- }
-
- if(msg != null) {
- retransmitter.ack(header.localSequenceID);
- id=header.localSequenceID;
- }
- else {
- if(log.isInfoEnabled())
- log.info("Bcast reply to " +
- "non-existent BCAST_REQ[" + header.localSequenceID +
- "], Sending NULL bcast");
- id=NULL_ID;
- msg=new Message(null, addr, new byte[0]);
- }
- msg.putHeader(getName(), new Header(Header.BCAST, id, header.sequenceID));
-
- passDown(new Event(Event.MSG, msg));
- }
-
-
- /**
- * Resend the bcast request with the given localSeqID
- *
- * @param seqID the local sequence id of the
- */
- protected/*GemStoneAddition*/ void _retransmitBcastRequest(long seqID) {
- // *** Get a shared lock
- try {
- stateLock.readLock().acquire();
- try {
- if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_RETRANSMIT_BCAST_REQ_0, seqID);
- _transmitBcastRequest(seqID);
- }
- finally {
- stateLock.readLock().release();
- }
- }
- catch(InterruptedException e) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- log.error(ExternalStrings.TOTAL_FAILED_ACQUIRING_A_READ_LOCK, e);
- }
- }
-
-
- /* Up event handlers
- * If the return value is true the event travels further up the stack
- * else it won't be forwarded
- */
-
- /**
- * Prepare for a VIEW_CHANGE: switch to flushing state
- *
- * @return true if the event is to be forwarded further up
- */
- private boolean _upBlock() {
- // *** Get an exclusive lock
- try {
- stateLock.writeLock().acquire();
- try {
- state=FLUSH;
- // *** Revoke the exclusive lock
- }
- finally {
- stateLock.writeLock().release();
- }
- }
- catch(InterruptedException e) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- log.error(ExternalStrings.TOTAL_FAILED_ACQUIRING_THE_WRITE_LOCK, e);
- }
-
- return (true);
- }
-
-
- /**
- * Handle an up MSG event
- *
- * @param event the MSG event
- * @return true if the event is to be forwarded further up
- */
- private boolean _upMsg(Event event) {
- Message msg;
- Object obj;
- Header header;
-
- // *** Get a shared lock
- try {
- stateLock.readLock().acquire();
- try {
-
- // If NULL_STATE, shouldn't receive any msg on the up queue!
- if(state == NULL_STATE) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_UP_MSG_IN_NULL_STATE);
- return (false);
- }
-
- // Peek the header:
- //
- // (UCAST) A unicast message - Send up the stack
- // (BCAST) A broadcast message - Handle specially
- // (REQ) A broadcast request - Handle specially
- // (REP) A broadcast reply from the sequencer - Handle specially
- msg=(Message)event.getArg();
- if(!((obj=msg.getHeader(getName())) instanceof TOTAL.Header)) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_NO_TOTALHEADER_FOUND);
- return (false);
- }
- header=(Header)obj;
-
- switch(header.type) {
- case Header.UCAST:
- _recvUcast(msg);
- return (true);
- case Header.BCAST:
- _recvBcast(msg);
- return (false);
- case Header.REQ:
- _recvBcastRequest(msg);
- return (false);
- case Header.REP:
- _recvBcastReply(header);
- return (false);
- default:
- if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_UNKNOWN_HEADER_TYPE);
- return (false);
- }
-
- // ** Revoke the shared lock
- }
- finally {
- stateLock.readLock().release();
- }
- }
- catch(InterruptedException e) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- if(log.isErrorEnabled()) log.error(e.getMessage());
- }
-
- return (true);
- }
-
-
- /**
- * Set the address of this group member
- *
- * @param event the SET_LOCAL_ADDRESS event
- * @return true if event should be forwarded further up
- */
- private boolean _upSetLocalAddress(Event event) {
- // *** Get an exclusive lock
- try {
- stateLock.writeLock().acquire();
- try {
- addr=(Address)event.getArg();
- }
- finally {
- stateLock.writeLock().release();
- }
- }
- catch(InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error(e.getMessage());
- }
- return (true);
- }
-
-
- /**
- * Handle view changes
- * <p/>
- * param event the VIEW_CHANGE event
- *
- * @return true if the event should be forwarded to the layer above
- */
- private boolean _upViewChange(Event event) {
- Object oldSequencerAddr;
-
- // *** Get an exclusive lock
- try {
- stateLock.writeLock().acquire();
- try {
-
- state=RUN;
-
- // i. See if this member is the sequencer
- // ii. If this is the sequencer, reset the sequencer's sequence ID
- // iii. Reset the last received sequence ID
- //
- // iv. Replay undelivered bcasts: Put all the undelivered bcasts
- // sent by us back to the req queue and discard the rest
- oldSequencerAddr=sequencerAddr;
- sequencerAddr=
- (Address)((View)event.getArg()).getMembers().elementAt(0);
- if(addr.equals(sequencerAddr)) {
- sequencerSeqID=NULL_ID;
- if((oldSequencerAddr == null) ||
- (!addr.equals(oldSequencerAddr)))
- if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_IM_THE_NEW_SEQUENCER);
- }
- seqID=NULL_ID;
- _replayBcast();
-
- // *** Revoke the exclusive lock
- }
- finally {
- stateLock.writeLock().release();
- }
- }
- catch(InterruptedException e) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- log.error(e.getMessage());
- }
-
- return (true);
- }
-
-
- /*
- * Down event handlers
- * If the return value is true the event travels further down the stack
- * else it won't be forwarded
- */
-
-
- /**
- * Blocking confirmed - No messages should come from above until a
- * VIEW_CHANGE event is received. Switch to blocking state.
- *
- * @return true if event should travel further down
- */
- private boolean _downBlockOk() {
- // *** Get an exclusive lock
- try {
- stateLock.writeLock().acquire();
- try {
- state=BLOCK;
- }
- finally {
- stateLock.writeLock().release();
- }
- }
- catch(InterruptedException e) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- log.error(e.getMessage());
- }
-
- return (true);
- }
-
-
- /**
- * A MSG event travelling down the stack. Forward unicast messages, treat
- * specially the broadcast messages.<br>
- * <p/>
- * If in <code>BLOCK</code> state, i.e. it has replied to a
- * <code>BLOCk_OK</code> and hasn't yet received a
- * <code>VIEW_CHANGE</code> event, messages are discarded<br>
- * <p/>
- * If in <code>FLUSH</code> state, forward unicast but queue broadcasts
- *
- * @param event the MSG event
- * @return true if event should travel further down
- */
- private boolean _downMsg(Event event) {
- Message msg;
-
- // *** Get a shared lock
- try {
- stateLock.readLock().acquire();
- try {
-
- // i. Discard all msgs, if in NULL_STATE
- // ii. Discard all msgs, if blocked
- if(state == NULL_STATE) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_DISCARD_MSG_IN_NULL_STATE);
- return (false);
- }
- if(state == BLOCK) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_BLOCKED_DISCARD_MSG);
- return (false);
- }
-
- msg=(Message)event.getArg();
- if(msg.getDest() == null) {
- _sendBcastRequest(msg);
- return (false);
- }
- else {
- msg=_sendUcast(msg);
- event.setArg(msg);
- }
-
- // ** Revoke the shared lock
- }
- finally {
- stateLock.readLock().release();
- }
- }
- catch(InterruptedException e) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- log.error(e.getMessage());
- }
-
- return (true);
- }
-
-
- /**
- * Prepare this layer to receive messages from above
- */
- @Override // GemStoneAddition
- public void start() throws Exception {
- TimeScheduler timer;
-
- timer=stack != null ? stack.timer : null;
- if(timer == null)
- throw new Exception("TOTAL.start(): timer is null");
-
- reqTbl=new TreeMap();
- upTbl=new TreeMap();
- retransmitter=new AckSenderWindow(new Command(), AVG_RETRANSMIT_INTERVAL);
- }
-
-
- /**
- * Handle the stop() method travelling down the stack.
- * <p/>
- * The local addr is set to null, since after a Start->Stop->Start
- * sequence this member's addr is not guaranteed to be the same
- */
- @Override // GemStoneAddition
- public void stop() {
- try {
- stateLock.writeLock().acquire();
- try {
- state=NULL_STATE;
- retransmitter.reset();
- reqTbl.clear();
- upTbl.clear();
- addr=null;
- }
- finally {
- stateLock.writeLock().release();
- }
- }
- catch(InterruptedException e) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- log.error(e.getMessage());
- }
- }
-
-
- /**
- * Process an event coming from the layer below
- *
- * @param event the event to process
- */
- private void _up(Event event) {
- switch(event.getType()) {
- case Event.BLOCK:
- if(!_upBlock()) return;
- break;
- case Event.MSG:
- if(!_upMsg(event)) return;
- break;
- case Event.SET_LOCAL_ADDRESS:
- if(!_upSetLocalAddress(event)) return;
- break;
- case Event.VIEW_CHANGE:
- if(!_upViewChange(event)) return;
- break;
- default:
- break;
- }
-
- passUp(event);
- }
-
-
- /**
- * Process an event coming from the layer above
- *
- * @param event the event to process
- */
- private void _down(Event event) {
- switch(event.getType()) {
- case Event.BLOCK_OK:
- if(!_downBlockOk()) return;
- break;
- case Event.MSG:
- if(!_downMsg(event)) return;
- break;
- default:
- break;
- }
-
- passDown(event);
- }
-
-
- /**
- * Create the TOTAL layer
- */
- public TOTAL() {
- }
-
-
- // Methods deriving from <code>Protocol</code>
- // javadoc inherited from superclass
- @Override // GemStoneAddition
- public String getName() {
- return (_getName());
- }
-
- // javadoc inherited from superclass
- @Override // GemStoneAddition
- public boolean setProperties(Properties properties) {
- return (_setProperties(properties));
- }
-
- // javadoc inherited from superclass
- @Override // GemStoneAddition
- public Vector requiredDownServices() {
- return (_requiredDownServices());
- }
-
- // javadoc inherited from superclass
- @Override // GemStoneAddition
- public Vector requiredUpServices() {
- return (_requiredUpServices());
- }
-
- // javadoc inherited from superclass
- @Override // GemStoneAddition
- public void up(Event event) {
- _up(event);
- }
-
- // javadoc inherited from superclass
- @Override // GemStoneAddition
- public void down(Event event) {
- _down(event);
- }
-}