You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pe...@apache.org on 2005/03/25 23:12:32 UTC
cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationTransmitter.java
pero 2005/03/25 14:12:32
Modified: modules/cluster/src/share/org/apache/catalina/cluster/tcp
ReplicationTransmitter.java
Log:
Refactor Code
Add compress transfer handling
Add api docs
Revision Changes Path
1.22 +444 -250 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
Index: ReplicationTransmitter.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -r1.21 -r1.22
--- ReplicationTransmitter.java 15 Feb 2005 09:32:39 -0000 1.21
+++ ReplicationTransmitter.java 25 Mar 2005 22:12:31 -0000 1.22
@@ -16,6 +16,7 @@
package org.apache.catalina.cluster.tcp;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -24,18 +25,22 @@
import javax.management.ObjectName;
import org.apache.catalina.cluster.ClusterSender;
-import org.apache.catalina.cluster.Constants;
import org.apache.catalina.cluster.Member;
import org.apache.catalina.cluster.io.XByteBuffer;
import org.apache.catalina.util.StringManager;
import org.apache.tomcat.util.IntrospectionUtils;
-
/**
+ * Transmit message to ohter cluster members create sender from replicationMode
+ * type
+ * FIXME i18n log messages
+ * FIXME compress data depends on message type and size
+ * FIXME send very big messages at some block see FarmWarDeployer!
+ * TODO pause and resume senders
+ *
* @author Peter Rossbach
* @author Filip Hanik
- * @version 1.2
- *
+ * @version $Revision$ $Date$
*/
public class ReplicationTransmitter implements ClusterSender {
private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
@@ -44,37 +49,69 @@
/**
* The descriptive information about this implementation.
*/
- private static final String info = "ReplicationTransmitter/1.2";
+ private static final String info = "ReplicationTransmitter/1.3";
/**
* The string manager for this package.
*/
protected StringManager sm = StringManager.getManager(Constants.Package);
- private java.util.HashMap map = new java.util.HashMap();
+ private Map map = new HashMap();
public ReplicationTransmitter() {
}
+ /**
+ * number of transmitted messages>
+ */
private long nrOfRequests = 0;
+ /**
+ * number of transmitted bytes
+ */
private long totalBytes = 0;
+ private long failureCounter = 0;
+
+ /**
+ * current sender replication mode
+ */
private String replicationMode;
+ /**
+ * sender default ackTimeout
+ */
private long ackTimeout = 15000; //15 seconds by default
- private boolean waitForAck = true ;
-
- private SimpleTcpCluster cluster;
+ /**
+ * enabled wait for ack
+ */
+ private boolean waitForAck = true;
- private ObjectName objectName;
+ /**
+ * autoConnect sender when next message send
+ */
+ private boolean autoConnect = true;
- private boolean autoConnect = true ;
+ /**
+ * Compress message data bytes
+ */
+ private boolean compress = true;
+ /**
+ * dynamic sender <code>properties</code>
+ */
private Map properties = new HashMap();
- private long failureCounter = 0 ;
+ /**
+ * my cluster
+ */
+ private SimpleTcpCluster cluster;
+
+ /**
+ * Transmitter Mbean name
+ */
+ private ObjectName objectName;
// ------------------------------------------------------------- Properties
@@ -89,27 +126,6 @@
}
- private synchronized void addStats(int length) {
- nrOfRequests++;
- totalBytes += length;
- if (log.isDebugEnabled() &&
- (nrOfRequests % 100) == 0) {
- log.debug("Nr of bytes sent=" + totalBytes + " over "
- + nrOfRequests + " ==" + (totalBytes / nrOfRequests)
- + " bytes/request");
- }
-
- }
-
- /*
- * Reset sender statistics
- */
- public synchronized void resetStatistics() {
- nrOfRequests = 0;
- totalBytes = 0;
- failureCounter = 0;
- }
-
/**
* @return Returns the nrOfRequests.
*/
@@ -124,6 +140,28 @@
return totalBytes;
}
+ /**
+ * @return Returns the failureCounter.
+ */
+ public long getFailureCounter() {
+ return failureCounter;
+ }
+
+ /**
+ * current replication mode
+ *
+ * @return
+ */
+ public String getReplicationMode() {
+ return replicationMode;
+ }
+
+ /**
+ * set replication Mode (pooled, synchonous, asynchonous, fastasyncqueue)
+ *
+ * @see IDataSenderFactory#validateMode(String)
+ * @param mode
+ */
public void setReplicationMode(String mode) {
String msg = IDataSenderFactory.validateMode(mode);
if (msg == null) {
@@ -134,94 +172,195 @@
throw new IllegalArgumentException(msg);
}
-
- public synchronized void add(Member member) {
- try {
- String key = getKey(member);
- if (!map.containsKey(key)) {
- IDataSender sender = IDataSenderFactory.getIDataSender(
- replicationMode, member);
- transferSenderProperty(sender);
- map.put(key, sender);
- registerSenderMBean(member, sender);
- }
- } catch (java.io.IOException x) {
- log.error("Unable to create and add a IDataSender object.", x);
- }
- }//add
/**
- * Transfer all properties from transmitter to concrete sender
- * @param sender
+ * Transmitter ObjectName
+ *
+ * @param name
*/
- protected void transferSenderProperty(IDataSender sender) {
- for (Iterator iter = getPropertyNames(); iter.hasNext();) {
- String pkey = (String) iter.next();
- Object value = getProperty(pkey);
- IntrospectionUtils.setProperty(sender, pkey, value.toString());
- }
+ public void setObjectName(ObjectName name) {
+ objectName = name;
}
- protected String getKey(Member member) {
- return member.getHost() + ":" + member.getPort();
+ public ObjectName getObjectName() {
+ return objectName;
}
- public synchronized void remove(Member member) {
- String key = getKey(member);
- IDataSender toberemoved = (IDataSender) map.get(key);
- if (toberemoved == null)
- return;
- unregisterSenderMBean(toberemoved);
- toberemoved.disconnect();
- map.remove(key);
-
+ /**
+ * @return Returns the compress.
+ */
+ public boolean isCompress() {
+ return compress;
}
- protected void unregisterSenderMBean(IDataSender sender) {
- try {
- MBeanServer mserver = cluster.getMBeanServer();
- if (mserver != null) {
- mserver.unregisterMBean(getSenderObjectName(sender));
- }
- } catch (Exception e) {
- log.warn(e);
- }
+ /**
+ * @param compress
+ * The compress to set.
+ */
+ public void setCompress(boolean compressMessageData) {
+ this.compress = compressMessageData;
}
- protected void registerSenderMBean(Member member, IDataSender sender) {
- if (member != null && cluster != null) {
- try {
- MBeanServer mserver = cluster.getMBeanServer();
- ObjectName senderName = getSenderObjectName(sender);
- if (mserver.isRegistered(senderName)) {
- if (log.isWarnEnabled())
- log.warn(sm.getString(
- "cluster.mbean.register.allready", senderName));
- return;
- }
- mserver.registerMBean(cluster.getManagedBean(sender),
- senderName);
- } catch (Exception e) {
- log.warn(e);
- }
- }
+ /**
+ * @return Returns the autoConnect.
+ */
+ public boolean isAutoConnect() {
+ return autoConnect;
}
- protected ObjectName getSenderObjectName(IDataSender sender) {
- ObjectName senderName = null;
- try {
- ObjectName clusterName = cluster.getObjectName();
- MBeanServer mserver = cluster.getMBeanServer();
- senderName = new ObjectName(clusterName.getDomain()
- + ":type=IDataSender,host="
- + clusterName.getKeyProperty("host") + ",senderAddress="
- + sender.getAddress().getHostAddress() + ",senderPort=" + sender.getPort());
- } catch (Exception e) {
- log.warn(e);
+ /**
+ * @param autoConnect
+ * The autoConnect to set.
+ */
+ public void setAutoConnect(boolean autoConnect) {
+ this.autoConnect = autoConnect;
+ setProperty("autoConnect", String.valueOf(autoConnect));
+
+ }
+
+ /**
+ * @return
+ */
+ public long getAckTimeout() {
+ return ackTimeout;
+ }
+
+ /**
+ * @param ackTimeout
+ */
+ public void setAckTimeout(long ackTimeout) {
+ this.ackTimeout = ackTimeout;
+ setProperty("ackTimeout", String.valueOf(ackTimeout));
+ }
+
+ /**
+ * @return Returns the waitForAck.
+ */
+ public boolean isWaitForAck() {
+ return waitForAck;
+ }
+
+ /**
+ * @param waitForAck
+ * The waitForAck to set.
+ */
+ public void setWaitForAck(boolean waitForAck) {
+ this.waitForAck = waitForAck;
+ setProperty("waitForAck", String.valueOf(waitForAck));
+ }
+
+ /*
+ * configured in cluster
+ *
+ * @see org.apache.catalina.cluster.ClusterSender#setCatalinaCluster(org.apache.catalina.cluster.tcp.SimpleTcpCluster)
+ */
+ public void setCatalinaCluster(SimpleTcpCluster cluster) {
+ this.cluster = cluster;
+
+ }
+
+ /**
+ * @return
+ * @deprecated since version 5.5.7
+ */
+ public boolean getIsSenderSynchronized() {
+ return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
+ || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
+ }
+
+ // ------------------------------------------------------------- dynamic
+ // sender property handling
+
+ /**
+ * set config attributes with reflect
+ *
+ * @param name
+ * @param value
+ */
+ public void setProperty(String name, Object value) {
+ if (log.isTraceEnabled())
+ log.trace(sm.getString("ReplicationTransmitter.setProperty", name,
+ value));
+
+ properties.put(name, value);
+ }
+
+ /**
+ * get current config
+ *
+ * @param key
+ * @return
+ */
+ public Object getProperty(String key) {
+ if (log.isTraceEnabled())
+ log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
+ return properties.get(key);
+ }
+
+ /**
+ * Get all properties keys
+ *
+ * @return
+ */
+ public Iterator getPropertyNames() {
+ return properties.keySet().iterator();
+ }
+
+ /**
+ * remove a configured property.
+ *
+ * @param key
+ */
+ public void removeProperty(String key) {
+ properties.remove(key);
+ }
+
+ // ------------------------------------------------------------- public
+
+ /**
+ * Send data to one member
+ *
+ * @see org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String,
+ * byte[], org.apache.catalina.cluster.Member)
+ */
+ public void sendMessage(String sessionId, byte[] indata, Member member)
+ throws java.io.IOException {
+ byte[] data = convertSenderData(indata);
+ String key = getKey(member);
+ IDataSender sender = (IDataSender) map.get(key);
+ sendMessageData(sessionId, data, sender);
+ }
+
+ /**
+ * send message to all senders (broadcast)
+ *
+ * @see org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String,
+ * byte[])
+ */
+ public void sendMessage(String sessionId, byte[] indata)
+ throws java.io.IOException {
+ IDataSender[] senders = getSenders();
+ byte[] data = convertSenderData(indata);
+ for (int i = 0; i < senders.length; i++) {
+
+ IDataSender sender = senders[i];
+ try {
+ sendMessageData(sessionId, data, sender);
+ } catch (Exception x) {
+
+ if (!sender.getSuspect())
+ log.warn("Unable to send replicated message to " + sender
+ + ", is server down?", x);
+ sender.setSuspect(true);
+ }
}
- return senderName;
}
+ /**
+ * start the sender and register transmitter mbean
+ *
+ * @see org.apache.catalina.cluster.ClusterSender#start()
+ */
public void start() throws java.io.IOException {
if (cluster != null) {
ObjectName clusterName = cluster.getObjectName();
@@ -248,14 +387,11 @@
}
- public void setObjectName(ObjectName name) {
- objectName = name;
- }
-
- public ObjectName getObjectName() {
- return objectName;
- }
-
+ /*
+ * stop the sender and deregister mbeans (transmitter, senders)
+ *
+ * @see org.apache.catalina.cluster.ClusterSender#stop()
+ */
public synchronized void stop() {
Iterator i = map.entrySet().iterator();
while (i.hasNext()) {
@@ -279,192 +415,250 @@
}
+ /**
+ * get all current senders
+ *
+ * @return
+ */
public IDataSender[] getSenders() {
- java.util.Iterator i = map.entrySet().iterator();
- java.util.Vector v = new java.util.Vector();
- while (i.hasNext()) {
- IDataSender sender = (IDataSender) ((java.util.Map.Entry) i.next())
- .getValue();
+ java.util.Iterator iter = map.entrySet().iterator();
+ IDataSender[] array = new IDataSender[map.size()];
+ int i = 0;
+ while (iter.hasNext()) {
+ IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
+ .next()).getValue();
if (sender != null)
- v.addElement(sender);
+ array[i] = sender;
+ i++;
}
- IDataSender[] result = new IDataSender[v.size()];
- v.copyInto(result);
- return result;
+ return array;
}
/**
- * Send message to concrete sender. If autoConnect is true, check is connection broken
- * and the reconnect the complete sender.
- * <ul>
- * <li>failure the suspect flag is set true. After successfully
- * sending the suspect flag is set to false.</li>
- * <li>Stats is only update after sussesfull sending</li>
- * </ul>
+ * get all current senders
*
- * @param sessionId Unique Message Id
- * @param data message Data
- * @param sender concrete message sender
- * @throws java.io.IOException
+ * @return
*/
- protected void sendMessageData(String sessionId, byte[] data,
- IDataSender sender) throws java.io.IOException {
- if (sender == null)
- throw new java.io.IOException(
- "Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
- try {
- if (autoConnect && !sender.isConnected())
- sender.connect();
- sender.sendMessage(sessionId, data);
- sender.setSuspect(false);
- addStats(data.length);
- } catch (Exception x) {
- if (log.isWarnEnabled()) {
- if (!sender.getSuspect()) {
- log
- .warn(
- "Unable to send replicated message, is server down?",
- x);
- }
- }
- sender.setSuspect(true);
- failureCounter++ ;
+ public ObjectName[] getSenderObjectNames() {
+ java.util.Iterator iter = map.entrySet().iterator();
+ ObjectName array[] = new ObjectName[map.size()];
+ int i = 0;
+ while (iter.hasNext()) {
+ IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
+ .next()).getValue();
+ if (sender != null)
+ array[i] = getSenderObjectName(sender);
+ i++;
}
-
+ return array;
}
- public void sendMessage(String sessionId, byte[] indata, Member member)
- throws java.io.IOException {
- byte[] data = XByteBuffer.createDataPackage(indata);
- String key = getKey(member);
- IDataSender sender = (IDataSender) map.get(key);
- sendMessageData(sessionId, data, sender);
+ /*
+ * Reset sender statistics
+ */
+ public synchronized void resetStatistics() {
+ nrOfRequests = 0;
+ totalBytes = 0;
+ failureCounter = 0;
}
- public void sendMessage(String sessionId, byte[] indata)
- throws java.io.IOException {
- IDataSender[] senders = getSenders();
- byte[] data = XByteBuffer.createDataPackage(indata);
- for (int i = 0; i < senders.length; i++) {
-
- IDataSender sender = senders[i];
- try {
- sendMessageData(sessionId, data, sender);
- } catch (Exception x) {
-
- if (!sender.getSuspect())
- log.warn("Unable to send replicated message to " + sender
- + ", is server down?", x);
- sender.setSuspect(true);
+ /*
+ * add new cluster member and create sender ( s. replicationMode) transfer
+ * current properties to sender
+ *
+ * @see org.apache.catalina.cluster.ClusterSender#add(org.apache.catalina.cluster.Member)
+ */
+ public synchronized void add(Member member) {
+ try {
+ String key = getKey(member);
+ if (!map.containsKey(key)) {
+ IDataSender sender = IDataSenderFactory.getIDataSender(
+ replicationMode, member);
+ transferSenderProperty(sender);
+ map.put(key, sender);
+ registerSenderMBean(member, sender);
}
- }//while
- }
-
- public String getReplicationMode() {
- return replicationMode;
+ } catch (java.io.IOException x) {
+ log.error("Unable to create and add a IDataSender object.", x);
+ }
}
/**
- * @return
- * @deprecated since Version 1.1
+ * remove sender from transmitter. ( deregister mbean and disconnect sender )
+ *
+ * @see org.apache.catalina.cluster.ClusterSender#remove(org.apache.catalina.cluster.Member)
*/
- public boolean getIsSenderSynchronized() {
- return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
- || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
+ public synchronized void remove(Member member) {
+ String key = getKey(member);
+ IDataSender toberemoved = (IDataSender) map.get(key);
+ if (toberemoved == null)
+ return;
+ unregisterSenderMBean(toberemoved);
+ toberemoved.disconnect();
+ map.remove(key);
+
}
+ // ------------------------------------------------------------- protected
+
/**
- * @return Returns the autoConnect.
+ * calc number of requests and transfered bytes. Log stats all 100 requets
+ *
+ * @param length
*/
- public boolean isAutoConnect() {
- return autoConnect;
+ protected synchronized void addStats(int length) {
+ nrOfRequests++;
+ totalBytes += length;
+ if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
+ log.debug("Nr of bytes sent=" + totalBytes + " over "
+ + nrOfRequests + "; avg=" + (totalBytes / nrOfRequests)
+ + " bytes/request; failures=" + failureCounter);
+ }
+
}
+
/**
- * @param autoConnect The autoConnect to set.
+ * Transfer all properties from transmitter to concrete sender
+ *
+ * @param sender
*/
- public void setAutoConnect(boolean autoConnect) {
- this.autoConnect = autoConnect;
- setProperty("autoConnect", String.valueOf(autoConnect));
-
- }
-
- public long getAckTimeout() {
- return ackTimeout;
- }
-
- public void setAckTimeout(long ackTimeout) {
- this.ackTimeout = ackTimeout;
- setProperty("ackTimeout", String.valueOf(ackTimeout));
+ protected void transferSenderProperty(IDataSender sender) {
+ for (Iterator iter = getPropertyNames(); iter.hasNext();) {
+ String pkey = (String) iter.next();
+ Object value = getProperty(pkey);
+ IntrospectionUtils.setProperty(sender, pkey, value.toString());
+ }
}
/**
- * @return Returns the waitForAck.
+ * set unique key to find sender
+ *
+ * @param member
+ * @return concat member.host:member.port
*/
- public boolean isWaitForAck() {
- return waitForAck;
+ protected String getKey(Member member) {
+ return member.getHost() + ":" + member.getPort();
}
+
/**
- * @param waitForAck The waitForAck to set.
- */
- public void setWaitForAck(boolean waitForAck) {
- this.waitForAck = waitForAck;
- setProperty("waitForAck", String.valueOf(waitForAck));
- }
-
- /*
- * (non-Javadoc)
+ * unregsister sendern Mbean
*
- * @see org.apache.catalina.cluster.ClusterSender#setCatalinaCluster(org.apache.catalina.cluster.tcp.SimpleTcpCluster)
+ * @see #getSenderObjectName(IDataSender)
+ * @param sender
*/
- public void setCatalinaCluster(SimpleTcpCluster cluster) {
- this.cluster = cluster;
-
+ protected void unregisterSenderMBean(IDataSender sender) {
+ try {
+ MBeanServer mserver = cluster.getMBeanServer();
+ if (mserver != null) {
+ mserver.unregisterMBean(getSenderObjectName(sender));
+ }
+ } catch (Exception e) {
+ log.warn(e);
+ }
}
- /**
- * set config attributes with reflect
- * @param name
- * @param value
+ /**
+ * register MBean and check it exist (big problem!)
+ *
+ * @param member
+ * @param sender
*/
- public void setProperty( String name, Object value ) {
- if( log.isTraceEnabled())
- log.trace(sm.getString("ReplicationTransmitter.setProperty", name, value));
-
- properties.put(name, value);
+ protected void registerSenderMBean(Member member, IDataSender sender) {
+ if (member != null && cluster != null) {
+ try {
+ MBeanServer mserver = cluster.getMBeanServer();
+ ObjectName senderName = getSenderObjectName(sender);
+ if (mserver.isRegistered(senderName)) {
+ if (log.isWarnEnabled())
+ log.warn(sm.getString(
+ "cluster.mbean.register.allready", senderName));
+ return;
+ }
+ mserver.registerMBean(cluster.getManagedBean(sender),
+ senderName);
+ } catch (Exception e) {
+ log.warn(e);
+ }
+ }
}
/**
- * get current config
- * @param key
+ * build sender ObjectName (
+ * engine.domain:type=IDataSender,host="host",senderAddress="receiver.address",senderPort="port" )
+ *
+ * @param sender
* @return
*/
- public Object getProperty( String key ) {
- if( log.isTraceEnabled())
- log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
- return properties.get(key);
+ protected ObjectName getSenderObjectName(IDataSender sender) {
+ ObjectName senderName = null;
+ try {
+ ObjectName clusterName = cluster.getObjectName();
+ MBeanServer mserver = cluster.getMBeanServer();
+ senderName = new ObjectName(clusterName.getDomain()
+ + ":type=IDataSender,host="
+ + clusterName.getKeyProperty("host") + ",senderAddress="
+ + sender.getAddress().getHostAddress() + ",senderPort="
+ + sender.getPort());
+ } catch (Exception e) {
+ log.warn(e);
+ }
+ return senderName;
}
/**
- * Get all properties keys
+ * compress data
+ *
+ * @see XByteBuffer#createDataPackage(byte[])
+ * @param indata
* @return
+ * @throws IOException
+ * FIXME get CompressMessageDate from cluster instanz
*/
- public Iterator getPropertyNames() {
- return properties.keySet().iterator();
+ protected byte[] convertSenderData(byte[] data) throws IOException {
+ return XByteBuffer.createDataPackage(data, isCompress());
}
-
- /**
- * remove a configured property.
- * @param key
- */
- public void removeProperty(String key) {
- properties.remove(key);
- }
-
/**
- * @return Returns the failureCounter.
+ * Send message to concrete sender. If autoConnect is true, check is
+ * connection broken and the reconnect the complete sender.
+ * <ul>
+ * <li>failure the suspect flag is set true. After successfully sending the
+ * suspect flag is set to false.</li>
+ * <li>Stats is only update after sussesfull sending</li>
+ * </ul>
+ *
+ * @param sessionId
+ * Unique Message Id
+ * @param data
+ * message Data
+ * @param sender
+ * concrete message sender
+ * @throws java.io.IOException
*/
- public long getFailureCounter() {
- return failureCounter;
+ protected void sendMessageData(String sessionId, byte[] data,
+ IDataSender sender) throws java.io.IOException {
+ if (sender == null)
+ throw new java.io.IOException(
+ "Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
+ try {
+ if (autoConnect && !sender.isConnected())
+ sender.connect();
+ sender.sendMessage(sessionId, data);
+ sender.setSuspect(false);
+ addStats(data.length);
+ } catch (Exception x) {
+ if (log.isWarnEnabled()) {
+ if (!sender.getSuspect()) {
+ log
+ .warn(
+ "Unable to send replicated message, is server down?",
+ x);
+ }
+ }
+ sender.setSuspect(true);
+ failureCounter++;
+ }
+
}
+
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org