You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/11 04:18:08 UTC
svn commit: r783607 [2/2] - in /activemq/sandbox/activemq-flow:
activemq-all/src/test/java/org/apache/activemq/broker/openwire/
activemq-bio/src/main/java/org/apache/activemq/transport/tcp/
activemq-broker/src/main/java/org/apache/activemq/apollo/ acti...
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -33,83 +33,68 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-public class MultiWireFormatFactory implements WireFormatFactory{
-
+public class MultiWireFormatFactory implements WireFormatFactory {
+
private static final Log LOG = LogFactory.getLog(MultiWireFormatFactory.class);
-
- private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
- private String wireFormats="openwire,stomp";
- private ArrayList<DiscriminatableWireFormatFactory> wireFormatFactories;
-
- static public class WireFormatConnected {
- final private DiscriminatableWireFormatFactory wireFormatFactory;
- final private WireFormat wireFormat;
-
- public WireFormatConnected(DiscriminatableWireFormatFactory wireFormatFactory, WireFormat wireFormat) {
- this.wireFormatFactory = wireFormatFactory;
- this.wireFormat = wireFormat;
- }
+ private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
- public DiscriminatableWireFormatFactory getWireFormatFactory() {
- return wireFormatFactory;
- }
+ private String wireFormats;
+ private ArrayList<WireFormatFactory> wireFormatFactories;
- public WireFormat getWireFormat() {
- return wireFormat;
- }
- }
-
static class MultiWireFormat implements WireFormat {
- ArrayList<DiscriminatableWireFormatFactory> wireFormatFactories = new ArrayList<DiscriminatableWireFormatFactory>();
+ public static final String WIREFORMAT_NAME = "multi";
+
+ ArrayList<WireFormatFactory> wireFormatFactories = new ArrayList<WireFormatFactory>();
WireFormat wireFormat;
int maxHeaderLength;
-
+
public int getVersion() {
return 0;
}
+
public boolean inReceive() {
return wireFormat.inReceive();
}
+
public void setVersion(int version) {
wireFormat.setVersion(version);
}
private ByteArrayOutputStream baos = new ByteArrayOutputStream();
private ByteArrayInputStream peeked;
-
+
public Object unmarshal(DataInput in) throws IOException {
- while( wireFormat == null ) {
-
- int readByte = ((InputStream)in).read();
- if( readByte < 0 ) {
+ while (wireFormat == null) {
+
+ int readByte = ((InputStream) in).read();
+ if (readByte < 0) {
throw new EOFException();
}
baos.write(readByte);
-
+
// Try to discriminate what we have read so far.
- for (DiscriminatableWireFormatFactory wff : wireFormatFactories) {
- if( wff.matchesWireformatHeader(baos.toByteSequence()) ) {
+ for (WireFormatFactory wff : wireFormatFactories) {
+ if (wff.matchesWireformatHeader(baos.toByteSequence())) {
wireFormat = wff.createWireFormat();
- peeked = new ByteArrayInputStream(baos.toByteSequence());
- return new WireFormatConnected(wff, wireFormat);
+ break;
}
}
-
- if( baos.size() >= maxHeaderLength ) {
+
+ if (baos.size() >= maxHeaderLength) {
throw new IOException("Could not discriminate the protocol.");
}
}
-
+
// If we have some peeked data we need to feed that back.. Only happens
// for the first few bytes of the protocol header.
- if( peeked!=null ) {
- in = new DataInputStream( new ConcatInputStream(peeked, (InputStream)in) );
+ if (peeked != null) {
+ in = new DataInputStream(new ConcatInputStream(peeked, (InputStream) in));
Object rc = wireFormat.unmarshal(in);
- if( peeked.available() <= 0 ) {
- peeked=null;
+ if (peeked.available() <= 0) {
+ peeked = null;
}
return rc;
}
@@ -117,7 +102,6 @@
return wireFormat.unmarshal(in);
}
-
public void marshal(Object command, DataOutput out) throws IOException {
wireFormat.marshal(command, out);
}
@@ -125,34 +109,51 @@
public ByteSequence marshal(Object command) throws IOException {
throw new UnsupportedOperationException();
}
+
public Object unmarshal(ByteSequence packet) throws IOException {
throw new UnsupportedOperationException();
}
- public ArrayList<DiscriminatableWireFormatFactory> getWireFormatFactories() {
+
+ public ArrayList<WireFormatFactory> getWireFormatFactories() {
return wireFormatFactories;
}
- public void setWireFormatFactories(ArrayList<DiscriminatableWireFormatFactory> wireFormatFactories) {
+
+ private void setWireFormatFactories(ArrayList<WireFormatFactory> wireFormatFactories) {
this.wireFormatFactories = wireFormatFactories;
- maxHeaderLength=0;
- for (DiscriminatableWireFormatFactory wff : wireFormatFactories) {
- maxHeaderLength = Math.max( maxHeaderLength, wff.maxWireformatHeaderLength());
+ maxHeaderLength = 0;
+ for (WireFormatFactory wff : wireFormatFactories) {
+ maxHeaderLength = Math.max(maxHeaderLength, wff.maxWireformatHeaderLength());
}
}
+
public Transport createTransportFilters(Transport transport, Map options) {
return transport;
}
+
+ public String getName() {
+ if (wireFormat == null) {
+ return WIREFORMAT_NAME;
+ } else {
+ return wireFormat.getName();
+ }
+ }
}
-
+
public WireFormat createWireFormat() {
MultiWireFormat rc = new MultiWireFormat();
- if( wireFormatFactories == null ) {
- wireFormatFactories = new ArrayList<DiscriminatableWireFormatFactory>();
+ if (wireFormatFactories == null) {
+ wireFormatFactories = new ArrayList<WireFormatFactory>();
String[] formats = getWireFormats().split("\\,");
for (int i = 0; i < formats.length; i++) {
try {
- wireFormatFactories.add((DiscriminatableWireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(formats[i]));
+ WireFormatFactory wff = (WireFormatFactory) WIREFORMAT_FACTORY_FINDER.newInstance(formats[i]);
+ if (wff.isDiscriminatable()) {
+ wireFormatFactories.add(wff);
+ } else {
+ throw new Exception("Not Discriminitable");
+ }
} catch (Exception e) {
- LOG.warn("Invalid wireformat '"+formats[i]+"': "+e.getMessage());
+ LOG.warn("Invalid wireformat '" + formats[i] + "': " + e.getMessage());
}
}
}
@@ -168,5 +169,36 @@
this.wireFormats = formats;
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable()
+ */
+ public boolean isDiscriminatable() {
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader
+ * (org.apache.activemq.util.ByteSequence)
+ */
+ public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+ throw new UnsupportedOperationException();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength
+ * ()
+ */
+ public int maxWireformatHeaderLength() {
+ throw new UnsupportedOperationException();
+ }
+
}
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Thu Jun 11 02:18:07 2009
@@ -39,6 +39,7 @@
*/
public class ObjectStreamWireFormat implements WireFormat {
+ public static final String WIREFORMAT_NAME = "object";
public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream ds = new DataOutputStream(baos);
@@ -52,7 +53,7 @@
}
public void marshal(Object command, DataOutput ds) throws IOException {
- ObjectOutputStream out = new ObjectOutputStream((OutputStream)ds);
+ ObjectOutputStream out = new ObjectOutputStream((OutputStream) ds);
out.writeObject(command);
out.flush();
out.reset();
@@ -60,13 +61,13 @@
public Object unmarshal(DataInput ds) throws IOException {
try {
- ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream((InputStream)ds);
+ ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream((InputStream) ds);
Object command;
command = in.readObject();
in.close();
return command;
} catch (ClassNotFoundException e) {
- throw (IOException)new IOException("unmarshal failed: " + e).initCause(e);
+ throw (IOException) new IOException("unmarshal failed: " + e).initCause(e);
}
}
@@ -77,10 +78,14 @@
return 0;
}
- public boolean inReceive() {
- // TODO implement the inactivity monitor
- return false;
- }
+ public String getName() {
+ return WIREFORMAT_NAME;
+ }
+
+ public boolean inReceive() {
+ // TODO implement the inactivity monitor
+ return false;
+ }
public Transport createTransportFilters(Transport transport, Map options) {
return transport;
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java Thu Jun 11 02:18:07 2009
@@ -65,6 +65,11 @@
int getVersion();
/**
+ * @return The name of the wireformat
+ */
+ String getName();
+
+ /**
* @return true if message is being received
*/
boolean inReceive();
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -16,6 +16,36 @@
*/
package org.apache.activemq.wireformat;
+import org.apache.activemq.util.ByteSequence;
+
public interface WireFormatFactory {
- WireFormat createWireFormat();
+
+ /**
+ * @return an instance of the wire format.
+ *
+ */
+ WireFormat createWireFormat();
+
+ /**
+ * @return true if this wire format factory is isDiscriminatable. A discriminatable
+ * WireFormat's will first write a header to the stream
+ */
+ boolean isDiscriminatable();
+
+ /**
+ * @return Returns the maximum length of the header used to discriminate the wire format if it
+ * {@link #isDiscriminatable()}
+ * @throws UnsupportedOperationException If {@link #isDiscriminatable()} is false
+ */
+ int maxWireformatHeaderLength();
+
+ /**
+ * Called to test if this wireformat matches the provided header.
+ *
+ * @param byteSequence The byte sequence representing the herader data read so far.
+ * @return True if the ByteSequence matches the wire format header.
+ */
+ boolean matchesWireformatHeader(ByteSequence byteSequence);
+
+
}