You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pe...@apache.org on 2005/02/15 10:32:39 UTC
cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationTransmitter.java
pero 2005/02/15 01:32:39
Modified: modules/cluster/src/share/org/apache/catalina/cluster/tcp
ReplicationTransmitter.java
Log:
Add dynamic property handling to replicationtransmitter to transfer attributes to senders
Add autoReconnect and WaitForAck handling
Revision Changes Path
1.21 +157 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
Index: ReplicationTransmitter.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -r1.20 -r1.21
--- ReplicationTransmitter.java 27 Dec 2004 09:30:36 -0000 1.20
+++ ReplicationTransmitter.java 15 Feb 2005 09:32:39 -0000 1.21
@@ -16,7 +16,9 @@
package org.apache.catalina.cluster.tcp;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -26,12 +28,25 @@
import org.apache.catalina.cluster.Member;
import org.apache.catalina.cluster.io.XByteBuffer;
import org.apache.catalina.util.StringManager;
+import org.apache.tomcat.util.IntrospectionUtils;
+
+/**
+ * @author Peter Rossbach
+ * @author Filip Hanik
+ * @version 1.2
+ *
+ */
public class ReplicationTransmitter implements ClusterSender {
private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
.getLog(ReplicationTransmitter.class);
/**
+ * The descriptive information about this implementation.
+ */
+ private static final String info = "ReplicationTransmitter/1.2";
+
+ /**
* The string manager for this package.
*/
protected StringManager sm = StringManager.getManager(Constants.Package);
@@ -49,10 +64,31 @@
private long ackTimeout = 15000; //15 seconds by default
+ private boolean waitForAck = true ;
+
private SimpleTcpCluster cluster;
private ObjectName objectName;
+ private boolean autoConnect = true ;
+
+ private Map properties = new HashMap();
+
+ private long failureCounter = 0 ;
+
+ // ------------------------------------------------------------- Properties
+
+ /**
+ * Return descriptive information about this implementation and the
+ * corresponding version number, in the format
+ * <code><description>/<version></code>.
+ */
+ public String getInfo() {
+
+ return (info);
+
+ }
+
private synchronized void addStats(int length) {
nrOfRequests++;
totalBytes += length;
@@ -64,6 +100,16 @@
}
}
+
+ /*
+ * Reset sender statistics
+ */
+ public synchronized void resetStatistics() {
+ nrOfRequests = 0;
+ totalBytes = 0;
+ failureCounter = 0;
+ }
+
/**
* @return Returns the nrOfRequests.
*/
@@ -88,13 +134,14 @@
throw new IllegalArgumentException(msg);
}
-
+
public synchronized void add(Member member) {
try {
String key = getKey(member);
if (!map.containsKey(key)) {
IDataSender sender = IDataSenderFactory.getIDataSender(
replicationMode, member);
+ transferSenderProperty(sender);
map.put(key, sender);
registerSenderMBean(member, sender);
}
@@ -103,7 +150,19 @@
}
}//add
- private String getKey(Member member) {
+ /**
+ * Transfer all properties from transmitter to concrete sender
+ * @param sender
+ */
+ protected void transferSenderProperty(IDataSender sender) {
+ for (Iterator iter = getPropertyNames(); iter.hasNext();) {
+ String pkey = (String) iter.next();
+ Object value = getProperty(pkey);
+ IntrospectionUtils.setProperty(sender, pkey, value.toString());
+ }
+ }
+
+ protected String getKey(Member member) {
return member.getHost() + ":" + member.getPort();
}
@@ -234,16 +293,28 @@
return result;
}
+ /**
+ * Send message to concrete sender. If autoConnect is true, check is connection broken
+ * and the reconnect the complete sender.
+ * <ul>
+ * <li>failure the suspect flag is set true. After successfully
+ * sending the suspect flag is set to false.</li>
+ * <li>Stats is only update after sussesfull sending</li>
+ * </ul>
+ *
+ * @param sessionId Unique Message Id
+ * @param data message Data
+ * @param sender concrete message sender
+ * @throws java.io.IOException
+ */
protected void sendMessageData(String sessionId, byte[] data,
IDataSender sender) throws java.io.IOException {
if (sender == null)
throw new java.io.IOException(
"Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
try {
- if (!sender.isConnected())
+ if (autoConnect && !sender.isConnected())
sender.connect();
- //set the timeout, will be ignored by async senders
- sender.setAckTimeout(getAckTimeout());
sender.sendMessage(sessionId, data);
sender.setSuspect(false);
addStats(data.length);
@@ -257,7 +328,7 @@
}
}
sender.setSuspect(true);
-
+ failureCounter++ ;
}
}
@@ -293,19 +364,53 @@
return replicationMode;
}
+ /**
+ * @return
+ * @deprecated since Version 1.1
+ */
public boolean getIsSenderSynchronized() {
return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
|| IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
}
+ /**
+ * @return Returns the autoConnect.
+ */
+ public boolean isAutoConnect() {
+ return autoConnect;
+ }
+ /**
+ * @param autoConnect The autoConnect to set.
+ */
+ public void setAutoConnect(boolean autoConnect) {
+ this.autoConnect = autoConnect;
+ setProperty("autoConnect", String.valueOf(autoConnect));
+
+ }
+
public long getAckTimeout() {
return ackTimeout;
}
public void setAckTimeout(long ackTimeout) {
this.ackTimeout = ackTimeout;
+ setProperty("ackTimeout", String.valueOf(ackTimeout));
}
+ /**
+ * @return Returns the waitForAck.
+ */
+ public boolean isWaitForAck() {
+ return waitForAck;
+ }
+ /**
+ * @param waitForAck The waitForAck to set.
+ */
+ public void setWaitForAck(boolean waitForAck) {
+ this.waitForAck = waitForAck;
+ setProperty("waitForAck", String.valueOf(waitForAck));
+ }
+
/*
* (non-Javadoc)
*
@@ -316,4 +421,50 @@
}
+ /**
+ * set config attributes with reflect
+ * @param name
+ * @param value
+ */
+ public void setProperty( String name, Object value ) {
+ if( log.isTraceEnabled())
+ log.trace(sm.getString("ReplicationTransmitter.setProperty", name, value));
+
+ properties.put(name, value);
+ }
+
+ /**
+ * get current config
+ * @param key
+ * @return
+ */
+ public Object getProperty( String key ) {
+ if( log.isTraceEnabled())
+ log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
+ return properties.get(key);
+ }
+
+ /**
+ * Get all properties keys
+ * @return
+ */
+ public Iterator getPropertyNames() {
+ return properties.keySet().iterator();
+ }
+
+
+ /**
+ * remove a configured property.
+ * @param key
+ */
+ public void removeProperty(String key) {
+ properties.remove(key);
+ }
+
+ /**
+ * @return Returns the failureCounter.
+ */
+ public long getFailureCounter() {
+ return failureCounter;
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org