You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2004/11/04 17:26:03 UTC
svn commit: rev 56603 - in incubator/directory/seda/trunk/src/java/org/apache/seda: decoder encoder
Author: trustin
Date: Thu Nov 4 08:26:01 2004
New Revision: 56603
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
Log:
DefaultEncoderManager and DefaultDecoderManager now works without subscribing to ConnectEvent.
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java Thu Nov 4 08:26:01 2004
@@ -27,8 +27,6 @@
import org.apache.commons.codec.stateful.DecoderCallback;
import org.apache.commons.codec.stateful.StatefulDecoder;
import org.apache.seda.event.AbstractSubscriber;
-import org.apache.seda.event.ConnectEvent;
-import org.apache.seda.event.ConnectSubscriber;
import org.apache.seda.event.DisconnectEvent;
import org.apache.seda.event.DisconnectSubscriber;
import org.apache.seda.event.EventRouter;
@@ -37,7 +35,6 @@
import org.apache.seda.event.RequestEvent;
import org.apache.seda.event.filter.EventTypeFilter;
import org.apache.seda.listener.ClientKey;
-import org.apache.seda.listener.KeyExpiryException;
import org.apache.seda.listener.UDPClientKey;
import org.apache.seda.protocol.InetServiceEntry;
import org.apache.seda.protocol.InetServicesDatabase;
@@ -55,7 +52,7 @@
* @version $Rev$
*/
public class DefaultDecoderManager extends DefaultStage
- implements DecoderManager, InputSubscriber, ConnectSubscriber,
+ implements DecoderManager, InputSubscriber,
DisconnectSubscriber
{
/** event router or bus this component subscribes and publishes events on */
@@ -91,7 +88,6 @@
super.setStageMonitor(new LoggingStageMonitor(getClass()));
router.subscribe(new EventTypeFilter(InputEvent.class), this);
- router.subscribe(new EventTypeFilter(ConnectEvent.class), this);
router.subscribe(new EventTypeFilter(DisconnectEvent.class), this);
}
@@ -142,93 +138,6 @@
decoders.remove(event.getClientKey());
}
- /**
- * Temporary place holder for functionality that looks up a protocol
- * specific StatefulDecoder.
- *
- * @param key the client key used to determine associated protocol
- * @return the new stateful nonblocking protocol specific decoder
- */
- private StatefulDecoder createDecoder(ClientKey key)
- throws KeyExpiryException
- {
- TransportTypeEnum transportType;
- if (key instanceof UDPClientKey) {
- transportType = TransportTypeEnum.UDP;
- } else {
- transportType = TransportTypeEnum.TCP;
- }
-
- Iterator it = inetdb.getByPort(key.getLocalAddress().getPort());
- ProtocolProvider provider = null;
- while (it.hasNext()) {
- InetServiceEntry entry = (InetServiceEntry) it.next();
- if (entry.getTransport() == transportType) {
- provider = entry.getProtocolProvider();
- }
- }
-
- // TODO replace RuntimeException with ProtocolProviderNotFoundException
- if (provider == null)
- throw new RuntimeException("No protocol provider available");
-
- return provider.getDecoderFactory().createDecoder();
- }
-
- /**
- * We basically create a new client decoder and put it into a map for
- * use later when we are processing input events from the client.
- *
- * @see org.apache.seda.event.ConnectSubscriber#inform(
- * org.apache.seda.event.ConnectEvent)
- */
- public void inform(ConnectEvent event)
- {
- StatefulDecoder decoder = null;
- ClientKey key = event.getClientKey();
-
- try
- {
- decoder = new ClientDecoder(key, createDecoder(key));
- }
- catch (KeyExpiryException e)
- {
- monitor.failedOnInform(this, event, e);
- return;
- }
-
- /*
- * Here the decoder informs us that a unit of data is decoded. In the
- * case of the snickers decoder we're decoding an LDAP message envelope
- * for a request. We use this request to create a RequestEvent and
- * publish the event on the queue.
- */
- decoder.setCallback(new DecoderCallback()
- {
- public void decodeOccurred(
- StatefulDecoder decoder,
- Object decoded)
- {
- ClientKey key = ((ClientDecoder) decoder).getClientKey();
- RequestEvent event = new RequestEvent(this, key, decoded);
- router.publish(event);
- }
- });
-
- /*
- * For potential race conditions between ConnectEvent processing and
- * the processing of the first InputEvent we synchronize on the decoders
- * and notify all when we have altered it. The thread that is waiting
- * for a client decoder in the decoders map will wait on the map until
- * awoken.
- */
- synchronized (decoders)
- {
- decoders.put(key, decoder);
- decoders.notifyAll();
- }
- }
-
// ------------------------------------------------------------------------
// Service Interface Methods
// ------------------------------------------------------------------------
@@ -252,16 +161,7 @@
throws DecoderException
{
// replace this decoder with a real one later
- StatefulDecoder decoder = null;
-
- try
- {
- decoder = createDecoder(key);
- }
- catch (KeyExpiryException e)
- {
- throw new DecoderException("client key has expired");
- }
+ StatefulDecoder decoder = decoder = createDecoder(key);
// used array to set a value on final variable and get by compiler
final Object[] decoded = new Object[1];
@@ -318,33 +218,75 @@
*/
StatefulDecoder getDecoder(ClientKey key)
{
+ StatefulDecoder decoder = (StatefulDecoder) decoders.get(key);
+ if (decoder == null) {
+ synchronized (decoders) {
+ decoder = (StatefulDecoder) decoders.get(key);
+ if (decoder == null) {
+ decoder = createClientDecoder(key);
+ decoders.put(key, decoder);
+ }
+ }
+ }
+
+ return decoder;
+ }
+
+ /**
+ * Temporary place holder for functionality that looks up a protocol
+ * specific StatefulDecoder.
+ *
+ * @param key the client key used to determine associated protocol
+ * @return the new stateful nonblocking protocol specific decoder
+ */
+ private StatefulDecoder createDecoder(ClientKey key)
+ {
+ TransportTypeEnum transportType;
+ if (key instanceof UDPClientKey) {
+ transportType = TransportTypeEnum.UDP;
+ } else {
+ transportType = TransportTypeEnum.TCP;
+ }
+
+ Iterator it = inetdb.getByPort(key.getLocalAddress().getPort());
+ ProtocolProvider provider = null;
+ while (it.hasNext()) {
+ InetServiceEntry entry = (InetServiceEntry) it.next();
+ if (entry.getTransport() == transportType) {
+ provider = entry.getProtocolProvider();
+ }
+ }
+
+ // TODO replace RuntimeException with ProtocolProviderNotFoundException
+ if (provider == null)
+ throw new RuntimeException("No protocol provider available");
+
+ return provider.getDecoderFactory().createDecoder();
+ }
+
+ private StatefulDecoder createClientDecoder(ClientKey key)
+ {
StatefulDecoder decoder = null;
+ decoder = new ClientDecoder(key, createDecoder(key));
+
/*
- * We synchronize on the decoders map so we can wait for notification
- * on it if it does not contain the client decoder. This is in case
- * the processing of the connect event is slow and the client decoder
- * has not been created yet. When processing of the ConnectEvent is
- * complete we are awoken via a notifyAll in the inform() method.
+ * Here the decoder informs us that a unit of data is decoded. In the
+ * case of the snickers decoder we're decoding an LDAP message envelope
+ * for a request. We use this request to create a RequestEvent and
+ * publish the event on the queue.
*/
- synchronized (decoders)
- {
- decoder = (StatefulDecoder) decoders.get(key);
-
- while (decoder == null)
+ decoder.setCallback(new DecoderCallback()
{
- try
- {
- decoders.wait();
- }
- catch (InterruptedException e)
+ public void decodeOccurred(
+ StatefulDecoder decoder,
+ Object decoded)
{
- e.printStackTrace();
+ ClientKey key = ((ClientDecoder) decoder).getClientKey();
+ RequestEvent event = new RequestEvent(this, key, decoded);
+ router.publish(event);
}
-
- decoder = (StatefulDecoder) decoders.get(key);
- }
- }
+ });
return decoder;
}
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java Thu Nov 4 08:26:01 2004
@@ -27,8 +27,6 @@
import org.apache.commons.codec.stateful.EncoderFactory;
import org.apache.commons.codec.stateful.StatefulEncoder;
import org.apache.seda.event.AbstractSubscriber;
-import org.apache.seda.event.ConnectEvent;
-import org.apache.seda.event.ConnectSubscriber;
import org.apache.seda.event.DisconnectEvent;
import org.apache.seda.event.DisconnectSubscriber;
import org.apache.seda.event.EventRouter;
@@ -37,7 +35,6 @@
import org.apache.seda.event.ResponseSubscriber;
import org.apache.seda.event.filter.EventTypeFilter;
import org.apache.seda.listener.ClientKey;
-import org.apache.seda.listener.KeyExpiryException;
import org.apache.seda.listener.UDPClientKey;
import org.apache.seda.protocol.InetServiceEntry;
import org.apache.seda.protocol.InetServicesDatabase;
@@ -57,7 +54,7 @@
* @version $Rev$
*/
public class DefaultEncoderManager extends DefaultStage
- implements EncoderManager, ConnectSubscriber, ResponseSubscriber,
+ implements EncoderManager, ResponseSubscriber,
DisconnectSubscriber
{
/** the event router used to publish and subscribe to events on */
@@ -88,7 +85,6 @@
monitor = new EncoderManagerMonitorAdapter();
this.inetdb = inetdb;
this.router = router;
- this.router.subscribe(new EventTypeFilter(ConnectEvent.class), this);
this.router.subscribe(new EventTypeFilter(ResponseEvent.class), this);
}
@@ -127,6 +123,66 @@
super.enqueue(event);
}
+ public ByteBuffer encodeBlocking(ClientKey key, Object response)
+ throws EncoderException
+ {
+ int port = key.getLocalAddress().getPort();
+
+ EncoderFactory factory =
+ (EncoderFactory) factories.get(inetdb.getProtoByPort(port));
+ StatefulEncoder encoder =
+ new ClientEncoder(key, factory.createEncoder());
+
+ // used array to set a value on final variable and get by compiler
+ final Object[] encoded = new Object[1];
+ encoder.setCallback(new EncoderCallback()
+ {
+ public void encodeOccurred(StatefulEncoder encoder, Object obj)
+ {
+ encoded[0] = obj;
+ }
+ });
+
+ encoder.encode(response);
+
+ // the encoded value should be set
+ if (encoded[0] == null)
+ {
+ throw new EncoderException("Expected a complete encoded object" +
+ " but encoder did not produce one");
+ }
+
+ return (ByteBuffer) encoded[0];
+ }
+
+ public void encodeNonBlocking(ClientKey key, Object response)
+ throws EncoderException
+ {
+ StatefulEncoder encoder = (StatefulEncoder) encoders.get(key);
+ encoder.encode(response);
+ }
+
+ EncoderManagerMonitor getMonitor()
+ {
+ return monitor;
+ }
+
+ StatefulEncoder getEncoder(ClientKey key)
+ {
+ StatefulEncoder encoder = (StatefulEncoder) encoders.get(key);
+ if (encoder == null) {
+ synchronized (encoders) {
+ encoder = (StatefulEncoder) encoders.get(key);
+ if (encoder == null) {
+ encoder = createClientEncoder(key);
+ encoders.put(key, encoder);
+ }
+ }
+ }
+
+ return encoder;
+ }
+
/**
* Temporary place holder for functionality that looks up a protocol
* specific StatefulEncoder.
@@ -135,7 +191,6 @@
* @return the new stateful nonblocking protocol specific encoder
*/
private StatefulEncoder createEncoder(ClientKey key)
- throws KeyExpiryException
{
TransportTypeEnum transportType;
if (key instanceof UDPClientKey) {
@@ -167,20 +222,9 @@
* @see org.apache.seda.event.ConnectSubscriber#inform(
* org.apache.seda.event.ConnectEvent)
*/
- public void inform(ConnectEvent event)
+ private StatefulEncoder createClientEncoder(ClientKey key)
{
- StatefulEncoder encoder = null;
- ClientKey key = event.getClientKey();
-
- try
- {
- encoder = new ClientEncoder(key, createEncoder(key));
- }
- catch (KeyExpiryException e)
- {
- monitor.failedOnInform(this, event, e);
- return;
- }
+ StatefulEncoder encoder = new ClientEncoder(key, createEncoder(key));
/*
* Here the encoder informs us that a response encoded.
@@ -196,55 +240,7 @@
router.publish(event);
}
});
- encoders.put(key, encoder);
- }
-
- public ByteBuffer encodeBlocking(ClientKey key, Object response)
- throws EncoderException
- {
- int port = key.getLocalAddress().getPort();
-
- EncoderFactory factory =
- (EncoderFactory) factories.get(inetdb.getProtoByPort(port));
- StatefulEncoder encoder =
- new ClientEncoder(key, factory.createEncoder());
-
- // used array to set a value on final variable and get by compiler
- final Object[] encoded = new Object[1];
- encoder.setCallback(new EncoderCallback()
- {
- public void encodeOccurred(StatefulEncoder encoder, Object obj)
- {
- encoded[0] = obj;
- }
- });
-
- encoder.encode(response);
-
- // the encoded value should be set
- if (encoded[0] == null)
- {
- throw new EncoderException("Expected a complete encoded object" +
- " but encoder did not produce one");
- }
-
- return (ByteBuffer) encoded[0];
- }
-
- public void encodeNonBlocking(ClientKey key, Object response)
- throws EncoderException
- {
- StatefulEncoder encoder = (StatefulEncoder) encoders.get(key);
- encoder.encode(response);
- }
-
- EncoderManagerMonitor getMonitor()
- {
- return monitor;
- }
-
- StatefulEncoder getEncoder(ClientKey key)
- {
- return (StatefulEncoder) encoders.get(key);
+
+ return encoder;
}
}
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java Thu Nov 4 08:26:01 2004
@@ -50,11 +50,7 @@
{
ResponseEvent re = (ResponseEvent) event;
ClientKey key = re.getClientKey();
- StatefulEncoder encoder;
-
- // FIXME Event synchronization issues
- while ((encoder = encMan.getEncoder(key)) == null)
- continue;
+ StatefulEncoder encoder = encMan.getEncoder(key);
try
{