You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/11 04:18:08 UTC

svn commit: r783607 [2/2] - in /activemq/sandbox/activemq-flow: activemq-all/src/test/java/org/apache/activemq/broker/openwire/ activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ activemq-broker/src/main/java/org/apache/activemq/apollo/ acti...

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -33,83 +33,68 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class MultiWireFormatFactory implements WireFormatFactory{
-    
+public class MultiWireFormatFactory implements WireFormatFactory {
+
     private static final Log LOG = LogFactory.getLog(MultiWireFormatFactory.class);
-    
-    private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
 
-    private String wireFormats="openwire,stomp";
-    private ArrayList<DiscriminatableWireFormatFactory> wireFormatFactories;
-    
-    static public class WireFormatConnected {
-        final private DiscriminatableWireFormatFactory wireFormatFactory;
-        final private WireFormat wireFormat;
-        
-        public WireFormatConnected(DiscriminatableWireFormatFactory wireFormatFactory, WireFormat wireFormat) {
-            this.wireFormatFactory = wireFormatFactory;
-            this.wireFormat = wireFormat;
-        }
+    private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
 
-        public DiscriminatableWireFormatFactory getWireFormatFactory() {
-            return wireFormatFactory;
-        }
+    private String wireFormats;
+    private ArrayList<WireFormatFactory> wireFormatFactories;
 
-        public WireFormat getWireFormat() {
-            return wireFormat;
-        }
-    }
-    
     static class MultiWireFormat implements WireFormat {
 
-        ArrayList<DiscriminatableWireFormatFactory> wireFormatFactories = new ArrayList<DiscriminatableWireFormatFactory>();
+        public static final String WIREFORMAT_NAME = "multi";
+
+        ArrayList<WireFormatFactory> wireFormatFactories = new ArrayList<WireFormatFactory>();
         WireFormat wireFormat;
         int maxHeaderLength;
-        
+
         public int getVersion() {
             return 0;
         }
+
         public boolean inReceive() {
             return wireFormat.inReceive();
         }
+
         public void setVersion(int version) {
             wireFormat.setVersion(version);
         }
 
         private ByteArrayOutputStream baos = new ByteArrayOutputStream();
         private ByteArrayInputStream peeked;
-        
+
         public Object unmarshal(DataInput in) throws IOException {
 
-            while( wireFormat == null ) {
-                
-                int readByte = ((InputStream)in).read();
-                if( readByte < 0 ) {
+            while (wireFormat == null) {
+
+                int readByte = ((InputStream) in).read();
+                if (readByte < 0) {
                     throw new EOFException();
                 }
                 baos.write(readByte);
-                
+
                 // Try to discriminate what we have read so far.
-                for (DiscriminatableWireFormatFactory wff : wireFormatFactories) {
-                    if( wff.matchesWireformatHeader(baos.toByteSequence()) ) {
+                for (WireFormatFactory wff : wireFormatFactories) {
+                    if (wff.matchesWireformatHeader(baos.toByteSequence())) {
                         wireFormat = wff.createWireFormat();
-                        peeked = new ByteArrayInputStream(baos.toByteSequence());
-                        return new WireFormatConnected(wff, wireFormat);
+                        break;
                     }
                 }
-                
-                if( baos.size() >= maxHeaderLength ) {
+
+                if (baos.size() >= maxHeaderLength) {
                     throw new IOException("Could not discriminate the protocol.");
                 }
             }
-            
+
             // If we have some peeked data we need to feed that back..  Only happens
             // for the first few bytes of the protocol header.
-            if( peeked!=null ) {
-                in = new DataInputStream( new ConcatInputStream(peeked, (InputStream)in) );
+            if (peeked != null) {
+                in = new DataInputStream(new ConcatInputStream(peeked, (InputStream) in));
                 Object rc = wireFormat.unmarshal(in);
-                if( peeked.available() <= 0 ) {
-                    peeked=null;
+                if (peeked.available() <= 0) {
+                    peeked = null;
                 }
                 return rc;
             }
@@ -117,7 +102,6 @@
             return wireFormat.unmarshal(in);
         }
 
-        
         public void marshal(Object command, DataOutput out) throws IOException {
             wireFormat.marshal(command, out);
         }
@@ -125,34 +109,51 @@
         public ByteSequence marshal(Object command) throws IOException {
             throw new UnsupportedOperationException();
         }
+
         public Object unmarshal(ByteSequence packet) throws IOException {
             throw new UnsupportedOperationException();
         }
-        public ArrayList<DiscriminatableWireFormatFactory> getWireFormatFactories() {
+
+        public ArrayList<WireFormatFactory> getWireFormatFactories() {
             return wireFormatFactories;
         }
-        public void setWireFormatFactories(ArrayList<DiscriminatableWireFormatFactory> wireFormatFactories) {
+
+        private void setWireFormatFactories(ArrayList<WireFormatFactory> wireFormatFactories) {
             this.wireFormatFactories = wireFormatFactories;
-            maxHeaderLength=0;
-            for (DiscriminatableWireFormatFactory wff : wireFormatFactories) {
-                maxHeaderLength = Math.max( maxHeaderLength, wff.maxWireformatHeaderLength());
+            maxHeaderLength = 0;
+            for (WireFormatFactory wff : wireFormatFactories) {
+                maxHeaderLength = Math.max(maxHeaderLength, wff.maxWireformatHeaderLength());
             }
         }
+
         public Transport createTransportFilters(Transport transport, Map options) {
             return transport;
         }
+
+        public String getName() {
+            if (wireFormat == null) {
+                return WIREFORMAT_NAME;
+            } else {
+                return wireFormat.getName();
+            }
+        }
     }
-        
+
     public WireFormat createWireFormat() {
         MultiWireFormat rc = new MultiWireFormat();
-        if( wireFormatFactories == null ) {
-            wireFormatFactories = new ArrayList<DiscriminatableWireFormatFactory>();
+        if (wireFormatFactories == null) {
+            wireFormatFactories = new ArrayList<WireFormatFactory>();
             String[] formats = getWireFormats().split("\\,");
             for (int i = 0; i < formats.length; i++) {
                 try {
-                    wireFormatFactories.add((DiscriminatableWireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(formats[i]));
+                    WireFormatFactory wff = (WireFormatFactory) WIREFORMAT_FACTORY_FINDER.newInstance(formats[i]);
+                    if (wff.isDiscriminatable()) {
+                        wireFormatFactories.add(wff);
+                    } else {
+                        throw new Exception("Not Discriminitable");
+                    }
                 } catch (Exception e) {
-                    LOG.warn("Invalid wireformat '"+formats[i]+"': "+e.getMessage());
+                    LOG.warn("Invalid wireformat '" + formats[i] + "': " + e.getMessage());
                 }
             }
         }
@@ -168,5 +169,36 @@
         this.wireFormats = formats;
     }
 
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable()
+     */
+    public boolean isDiscriminatable() {
+        return false;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader
+     * (org.apache.activemq.util.ByteSequence)
+     */
+    public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+        throw new UnsupportedOperationException();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength
+     * ()
+     */
+    public int maxWireformatHeaderLength() {
+        throw new UnsupportedOperationException();
+    }
+
 
 }

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Thu Jun 11 02:18:07 2009
@@ -39,6 +39,7 @@
  */
 public class ObjectStreamWireFormat implements WireFormat {
 
+    public static final String WIREFORMAT_NAME = "object";
     public ByteSequence marshal(Object command) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream ds = new DataOutputStream(baos);
@@ -52,7 +53,7 @@
     }
 
     public void marshal(Object command, DataOutput ds) throws IOException {
-        ObjectOutputStream out = new ObjectOutputStream((OutputStream)ds);
+        ObjectOutputStream out = new ObjectOutputStream((OutputStream) ds);
         out.writeObject(command);
         out.flush();
         out.reset();
@@ -60,13 +61,13 @@
 
     public Object unmarshal(DataInput ds) throws IOException {
         try {
-            ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream((InputStream)ds);
+            ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream((InputStream) ds);
             Object command;
             command = in.readObject();
             in.close();
             return command;
         } catch (ClassNotFoundException e) {
-            throw (IOException)new IOException("unmarshal failed: " + e).initCause(e);
+            throw (IOException) new IOException("unmarshal failed: " + e).initCause(e);
         }
     }
 
@@ -77,10 +78,14 @@
         return 0;
     }
 
-	public boolean inReceive() {
-		// TODO implement the inactivity monitor
-		return false;
-	}
+    public String getName() {
+        return WIREFORMAT_NAME;
+    }
+
+    public boolean inReceive() {
+        // TODO implement the inactivity monitor
+        return false;
+    }
 
     public Transport createTransportFilters(Transport transport, Map options) {
         return transport;

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java Thu Jun 11 02:18:07 2009
@@ -65,6 +65,11 @@
     int getVersion();
     
     /**
+     * @return The name of the wireformat
+     */
+    String getName();
+    
+    /**
      * @return true if message is being received
      */
     boolean inReceive();

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java Thu Jun 11 02:18:07 2009
@@ -16,6 +16,36 @@
  */
 package org.apache.activemq.wireformat;
 
+import org.apache.activemq.util.ByteSequence;
+
 public interface WireFormatFactory {
-    WireFormat createWireFormat();    
+    
+    /**
+     * @return an instance of the wire format. 
+     * 
+     */
+    WireFormat createWireFormat();
+    
+    /**
+     * @return true if this wire format factory is isDiscriminatable. A discriminatable
+     * WireFormat's will first write a header to the stream 
+     */
+    boolean isDiscriminatable();
+    
+    /**
+     * @return Returns the maximum length of the header used to discriminate the wire format if it
+     * {@link #isDiscriminatable()}
+     * @throws UnsupportedOperationException If {@link #isDiscriminatable()} is false
+     */
+    int maxWireformatHeaderLength();
+
+    /**
+     * Called to test if this wireformat matches the provided header.
+     * 
+     * @param byteSequence The byte sequence representing the herader data read so far.
+     * @return True if the ByteSequence matches the wire format header.
+     */
+    boolean matchesWireformatHeader(ByteSequence byteSequence);
+
+    
 }