You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC

svn commit: r507672 [2/16] - in /incubator/qpid/branches/qpid.0-9: gentools/src/org/apache/qpid/gentools/ gentools/templ.java/ gentools/templ.net/ java/ java/broker/ java/broker/bin/ java/broker/distribution/ java/broker/distribution/src/ java/broker/d...

Modified: incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java Wed Feb 14 12:02:03 2007
@@ -61,7 +61,9 @@
 	static private Method mbGetGenerateMethod;
 	static private Method mbMangledGetGenerateMethod;
 	static private Method mbParamListGenerateMethod;
+	static private Method mbPassedParamListGenerateMethod;
 	static private Method mbMangledParamListGenerateMethod;
+	static private Method mbMangledPassedParamListGenerateMethod;
 	static private Method mbBodyInitGenerateMethod;
 	static private Method mbMangledBodyInitGenerateMethod;
 	static private Method mbSizeGenerateMethod;
@@ -131,11 +133,22 @@
 			AmqpVersionSet.class, int.class, int.class, boolean.class); }
 		catch (NoSuchMethodException e) { e.printStackTrace(); }
 		
+		
+		try { mbPassedParamListGenerateMethod = JavaGenerator.class.getDeclaredMethod(
+			"generateMbPassedParamList", String.class, AmqpField.class,
+			AmqpVersionSet.class, int.class, int.class, boolean.class); }
+		catch (NoSuchMethodException e) { e.printStackTrace(); }
+		
 		try { mbMangledParamListGenerateMethod = JavaGenerator.class.getDeclaredMethod(
 			"generateMbMangledParamList", AmqpField.class,
 			int.class, int.class, boolean.class); }
 		catch (NoSuchMethodException e) { e.printStackTrace(); }
 		
+		try { mbMangledPassedParamListGenerateMethod = JavaGenerator.class.getDeclaredMethod(
+			"generateMbMangledPassedParamList", AmqpField.class,
+			int.class, int.class, boolean.class); }
+		catch (NoSuchMethodException e) { e.printStackTrace(); }
+		
 		try { mbBodyInitGenerateMethod = JavaGenerator.class.getDeclaredMethod(
 			"generateMbBodyInit", String.class, AmqpField.class,
 			AmqpVersionSet.class, int.class, int.class, boolean.class); }
@@ -320,10 +333,10 @@
 			"EncodingUtils.writeUnsignedShort(buffer, #)",	// encode expression
 			"# = buffer.getUnsignedShort()")); 				// decode expression
 		typeMap.put("shortstr", new DomainInfo(
-			"String",										// Java code type
+			"AMQShortString",										// Java code type
 			"EncodingUtils.encodedShortStringLength(#)",	// size
 			"EncodingUtils.writeShortStringBytes(buffer, #)", // encode expression
-			"# = EncodingUtils.readShortString(buffer)"));	// decode expression
+			"# = EncodingUtils.readAMQShortString(buffer)"));	// decode expression
 		typeMap.put("table", new DomainInfo(
 			"FieldTable",									// Java code type
 			"EncodingUtils.encodedFieldTableLength(#)", 	// size
@@ -457,11 +470,11 @@
 		if (token.compareTo("${CLASS}") == 0 && thisClass != null)
 			return thisClass.name;
 		if (token.compareTo("${CLASS_ID_INIT}") == 0 && thisClass != null)
-			return generateIndexInitializer("classIdMap", thisClass.indexMap, 8);
+			return generateIndexInitializer("registerClassId", thisClass.indexMap, 8);
 		if (token.compareTo("${METHOD}") == 0 && method != null)
 			return method.name;
 		if (token.compareTo("${METHOD_ID_INIT}") == 0 && method != null)
-			return generateIndexInitializer("methodIdMap", method.indexMap, 8);
+			return generateIndexInitializer("registerMethodId", method.indexMap, 8);
 		if (token.compareTo("${FIELD}") == 0 && field != null)
 			return field.name;
 		
@@ -573,6 +586,16 @@
 			codeSnippet += fieldMap.parseFieldMap(mbParamListGenerateMethod,
 				mbMangledParamListGenerateMethod, 42, 4, this);
 		}
+
+		else if (token.compareTo("${mb_field_passed_parameter_list}") == 0)
+		{
+			// <cringe> The code generated by this is ugly... It puts a comma on a line by itself!
+			// TODO: Find a more elegant solution here sometime...
+			codeSnippet = fieldMap.size() > 0 ? Utils.createSpaces(42) + "," + cr : "";
+			// </cringe>
+			codeSnippet += fieldMap.parseFieldMap(mbPassedParamListGenerateMethod,
+				mbMangledPassedParamListGenerateMethod, 42, 4, this);
+		}
 		else if (token.compareTo("${mb_field_body_initialize}") == 0)
 		{
 			codeSnippet = fieldMap.parseFieldMap(mbBodyInitGenerateMethod,
@@ -712,16 +735,12 @@
 		String indent = Utils.createSpaces(indentSize);
 		StringBuffer sb = new StringBuffer();
 		
-		Iterator<Integer> iItr = indexMap.keySet().iterator();
-		while (iItr.hasNext())
+        for (Integer index : indexMap.keySet())
 		{
-			int index = iItr.next();
 			AmqpVersionSet versionSet = indexMap.get(index);
-			Iterator<AmqpVersion> vItr = versionSet.iterator();
-			while (vItr.hasNext())
+            for (AmqpVersion version : versionSet)
 			{
-				AmqpVersion version = vItr.next();
-				sb.append(indent + mapName + ".put(\"" + version.toString() + "\", " + index + ");" + cr);
+				sb.append(indent + mapName + "( (byte) " + version.getMajor() +", (byte) " + version.getMinor() + ", " + index + ");" + cr);
 			}
 		}
 		return sb.toString();		
@@ -746,12 +765,12 @@
 					{
 						int classIndex = findIndex(thisClass.indexMap, version);
 						int methodIndex = findIndex(method.indexMap, version);
-						sb.append(indent + "classIDMethodIDVersionBodyMap.put(" + cr);
-						sb.append(indent + tab + "createMapKey((short)" + classIndex +
-								", (short)" + methodIndex + ", (byte)" + version.getMajor() +
-								", (byte)" + version.getMinor() + "), " + cr);
+						sb.append(indent + "registerMethod(" + cr);
+						sb.append(indent + tab + "(short)" + classIndex +
+							", (short)" + methodIndex + ", (byte)" + version.getMajor() +
+							", (byte)" + version.getMinor() + ", " + cr);
 						sb.append(indent + tab + Utils.firstUpper(thisClass.name) +
-								Utils.firstUpper(method.name) + "Body.class);" + cr);
+							Utils.firstUpper(method.name) + "Body.getFactory());" + cr);
 					}
 					catch (Exception e) {} // Ignore
 				}
@@ -924,6 +943,15 @@
 			(nextFlag ? "," : "") + " // AMQP version(s): " + versionSet + cr;
 	}
 	
+	
+	protected String generateMbPassedParamList(String codeType, AmqpField field,
+		AmqpVersionSet versionSet, int indentSize, int tabSize, boolean nextFlag)
+	{
+		return Utils.createSpaces(indentSize) + field.name +
+			(nextFlag ? "," : "") + " // AMQP version(s): " + versionSet + cr;
+	}
+	
+	
 	protected String generateMbMangledParamList(AmqpField field, int indentSize,
 		int tabSize, boolean nextFlag)
 		throws AmqpTypeMappingException
@@ -943,10 +971,29 @@
 		return sb.toString();		
 	}
 	
+	protected String generateMbMangledPassedParamList(AmqpField field, int indentSize,
+		int tabSize, boolean nextFlag)
+		throws AmqpTypeMappingException
+	{
+		StringBuffer sb = new StringBuffer();
+		Iterator<String> dItr = field.domainMap.keySet().iterator();
+		int domainCntr = 0;
+		while (dItr.hasNext())
+		{
+			String domainName = dItr.next();
+			AmqpVersionSet versionSet = field.domainMap.get(domainName);
+			sb.append(Utils.createSpaces(indentSize) + field.name + "_" +
+				(domainCntr++) + (nextFlag ? "," : "") + " // AMQP version(s): " +
+				versionSet + cr);
+		}
+		return sb.toString();		
+	}
+	
+	
 	protected String generateMbBodyInit(String codeType, AmqpField field,
 		AmqpVersionSet versionSet, int indentSize, int tabSize, boolean nextFlag)
 	{
-		return Utils.createSpaces(indentSize) + "bodyFrame." + field.name + " = " + field.name +
+		return Utils.createSpaces(indentSize) + "this." + field.name + " = " + field.name +
 			";" + cr;
 	}
 	
@@ -1004,23 +1051,26 @@
 		int ordinal, int indentSize, int tabSize)
 	{
 		String indent = Utils.createSpaces(indentSize);
-		String bitArrayName = "bitArray_" + ordinal;
-		StringBuffer sb = new StringBuffer(indent + "boolean[] " + bitArrayName +
-			" = new boolean[] { ");
-		for (int i=0; i<bitFieldList.size(); i++)
+	
+		StringBuilder sb = new StringBuilder();
+		int i = 0;
+		while(i <bitFieldList.size())
 		{
-			if (i != 0)
+		
+			StringBuilder line = new StringBuilder();
+			
+			for (int j=0; i<bitFieldList.size() && j<8; i++, j++)
 			{
-				if ((i + 3) % 6 == 0)
-					sb.append("," + cr + indent + Utils.createSpaces(tabSize));
-				else
-					sb.append(", ");
+				if (j != 0)
+				{				
+					line.append(", ");
+				}
+				line.append(bitFieldList.get(i));
 			}
-			sb.append(bitFieldList.get(i));
+			
+			sb.append(indent +
+				typeMap.get("bit").encodeExpression.replaceAll("#", line.toString()) + ";" + cr);
 		}
-		sb.append(" };" + cr);
-		sb.append(Utils.createSpaces(indentSize) +
-				typeMap.get("bit").encodeExpression.replaceAll("#", bitArrayName) + ";" + cr);
 		return sb.toString();
 	}
 
@@ -1038,14 +1088,25 @@
 		int ordinal, int indentSize, int tabSize)
 	{
 		String indent = Utils.createSpaces(indentSize);
-		String bitArrayName = "bitArray_" + ordinal;
-		StringBuffer sb = new StringBuffer();
-		sb.append(indent +
-			typeMap.get("bit").decodeExpression.replaceAll("#", "boolean[] " + bitArrayName) +
-			";" + cr);
-		for (int i=0; i<bitFieldList.size(); i++)
+		
+		StringBuilder sb = new StringBuilder(indent);
+        // Multiple occurrences of byte value blocks will result in multiple declarations
+        // unless each is named uniquely.
+        String varName = "packedValue_" + ordinal;
+		sb.append("byte " + varName + ";");
+		sb.append(cr);
+		
+		// RG HERE!
+		
+		int i = 0;
+		while(i < bitFieldList.size())
 		{
-			sb.append(indent + bitFieldList.get(i) + " = " + bitArrayName + "[" + i + "];" + cr);
+			sb.append(indent + varName + " = EncodingUtils.readByte(buffer);" + cr);
+			
+			for(int j = 0; i < bitFieldList.size() && j < 8; i++, j++)
+			{
+				sb.append(indent + bitFieldList.get(i) + " = ( " + varName + " & (byte) (1 << " + j + ") ) != 0;" + cr); 
+			}
 		}
 		return sb.toString();
 	}

Modified: incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/Main.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/Main.java (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/Main.java Wed Feb 14 12:02:03 2007
@@ -38,8 +38,11 @@
 {
     private static final String defaultOutDir = ".." + Utils.fileSeparator + "gen";
     private static final String defaultCppTemplateDir = ".." + Utils.fileSeparator + "templ.cpp";
+    private static final String defaultDotnetTemplateDir = ".." + Utils.fileSeparator + "templ.net";
     private static final String defaultJavaTemplateDir = ".." + Utils.fileSeparator + "templ.java";
     
+    private enum GeneratorLangEnum { CPP, DOTNET, JAVA }
+    
 	private DocumentBuilder docBuilder;
 	private AmqpVersionSet versionSet;
 	private Generator generator;
@@ -49,7 +52,7 @@
     
     private String outDir;
     private String tmplDir;
-    private boolean javaFlag;
+    private GeneratorLangEnum generatorLang;
     private ArrayList<String> xmlFiles;
     private File[] modelTemplateFiles;
     private File[] classTemplateFiles;
@@ -81,13 +84,20 @@
         // 0. Initialize
         outDir = defaultOutDir;
         tmplDir = null;
-        javaFlag = true;
+        generatorLang = GeneratorLangEnum.CPP; // Default generation language
         xmlFiles.clear();
         processArgs(args);
-		if (javaFlag)
-            prepareJava();
-        else
+        switch (generatorLang)
+        {
+        case JAVA:
+        	prepareJava();
+        	break;
+        case DOTNET:
+        	prepareDotnet();
+        	break;
+        default:
             prepareCpp();
+        }
 
 		if (modelTemplateFiles.length == 0 && classTemplateFiles.length == 0 &&
 			methodTemplateFiles.length == 0 && fieldTemplateFiles.length == 0)
@@ -128,11 +138,15 @@
                 {
                     case 'c':
                     case 'C':
-                        javaFlag = false;
+                    	generatorLang = GeneratorLangEnum.CPP;
                         break;
                     case 'j':
                     case 'J':
-                        javaFlag = true;
+                    	generatorLang = GeneratorLangEnum.JAVA;
+                        break;
+                    case 'n':
+                    case 'N':
+                    	generatorLang = GeneratorLangEnum.DOTNET;
                         break;
                     case 'o':
                     case 'O':
@@ -182,6 +196,30 @@
         };
     }
     
+    private void prepareDotnet()
+    {
+        if (tmplDir == null)
+            tmplDir = defaultDotnetTemplateDir;
+        System.out.println(".NET generation mode.");
+        generator = new DotnetGenerator(versionSet);
+        constants = new AmqpConstantSet(generator);
+        domainMap = new AmqpDomainMap(generator);
+        model = new AmqpModel(generator);       
+    	// TODO: Add templated that should be handled in here...
+       modelTemplateFiles = new File[]
+        {
+//          new File(tmplDir + Utils.fileSeparator + "XXXClass.tmpl"),
+        };
+        classTemplateFiles = new File[]
+        {
+//          new File(tmplDir + Utils.fileSeparator + "XXXClass.tmpl"),
+        };
+        methodTemplateFiles = new File[]
+        {
+//          new File(tmplDir + Utils.fileSeparator + "XXXClass.tmpl"),
+        };
+   }
+   
     private void prepareCpp()
     {
         if (tmplDir == null)
@@ -293,6 +331,7 @@
 		System.out.println("Usage: Main -c|-j [-o outDir] [-t tmplDir] XMLfile [XMLfile ...]");
 		System.out.println("       where -c:         Generate C++.");
 		System.out.println("             -j:         Generate Java.");
+		System.out.println("             -n:         Generate .NET.");
         System.out.println("             -o outDir:  Use outDir as the output dir (default=\"" + defaultOutDir + "\").");
         System.out.println("             -t tmplDir: Find templates in tmplDir.");
         System.out.println("                         Defaults: \"" + defaultCppTemplateDir + "\" for C++;");

Modified: incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl Wed Feb 14 12:02:03 2007
@@ -28,36 +28,107 @@
 
 package org.apache.qpid.framing;
 
-import java.util.TreeMap;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Arrays;
 
 import org.apache.mina.common.ByteBuffer;
 
 public class ${CLASS}${METHOD}Body extends AMQMethodBody implements EncodableAMQDataBlock
 {
-    public static final TreeMap<String, Integer> classIdMap = new TreeMap<String, Integer>();
-    public static final TreeMap<String, Integer> methodIdMap = new TreeMap<String, Integer>();
+    private static final AMQMethodBodyInstanceFactory factory = new AMQMethodBodyInstanceFactory()
+    {
+        public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer in, long size) throws AMQFrameDecodingException
+        {
+            return new ${CLASS}${METHOD}Body(major, minor, in);
+        }
+		
+	    public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer in, long size) throws AMQFrameDecodingException
+        {
+            return new ${CLASS}${METHOD}Body(major, minor, clazzID, methodID, in);
+        }
+ 
+    };
+
+    public static AMQMethodBodyInstanceFactory getFactory()
+    {
+        return factory;
+    }
 
+    public static HashMap<Integer, Integer> classIdMap = new HashMap<Integer, Integer>();
+    public static HashMap<Integer, Integer> methodIdMap = new HashMap<Integer, Integer>();
+    
+    private static void registerMethodId(byte major, byte minor, int methodId)
+    {
+        methodIdMap.put((0xff & (int) major) | ((0xff & (int) minor)<<8), methodId); 
+    }
+    
+    private static void registerClassId(byte major, byte minor, int classId)
+    {
+        classIdMap.put((0xff & (int) major) | ((0xff & (int) minor)<<8), classId); 
+    }
+    
+    
     static
     {
-${CLASS_ID_INIT}
-${METHOD_ID_INIT}
+        ${CLASS_ID_INIT}
+        ${METHOD_ID_INIT}
     }
 
     // Fields declared in specification
 %{FLIST}    ${field_declaration}
 
+    private final int _clazz;
+    private final int _method;
+
+    
     // Constructor
-    public ${CLASS}${METHOD}Body(byte major, byte minor)
+
+    public ${CLASS}${METHOD}Body(byte major, byte minor, ByteBuffer buffer) throws AMQFrameDecodingException
+    {
+		this(major, minor, getClazz(major,minor), getMethod(major,minor), buffer);
+	}
+	
+    public ${CLASS}${METHOD}Body(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer) throws AMQFrameDecodingException
     {
         super(major, minor);
-    }
-
-    public int getClazz() { return classIdMap.get(major + "-" + minor); }
-    public int getMethod() { return methodIdMap.get(major + "-" + minor); }
-    public static int getClazz(byte major, byte minor) { return classIdMap.get(major + "-" + minor); }
-    public static int getMethod(byte major, byte minor) { return methodIdMap.get(major + "-" + minor); }
-
-    // Field methods
+        _clazz = clazzID; 
+        _method = methodID; 
+%{FLIST}        ${mb_field_decode}
+    }
+    
+    public ${CLASS}${METHOD}Body(byte major, byte minor, int clazzID, int methodID
+%{FLIST}        ${mb_field_parameter_list}
+                                         )
+    {
+        super(major, minor);
+        _clazz = getClazz(major,minor); 
+        _method = getMethod(major,minor); 
+%{FLIST}        ${mb_field_body_initialize}
+    }
+    
+    public int getClazz() 
+    { 
+        return _clazz; 
+    }
+    
+    public int getMethod() 
+    { 
+        return _method; 
+    }
+
+    public static int getClazz(byte major, byte minor) 
+    { 
+        return classIdMap.get((0xff & (int) major) | ((0xff & (int) minor)<<8)); 
+    }
+    
+    public static int getMethod(byte major, byte minor) 
+    { 
+        return methodIdMap.get((0xff & (int) major) | ((0xff & (int) minor)<<8)); 
+    }
+    
+    
+    // Field methods        
 %{FLIST}    ${mb_field_get_method}
 
     public int getBodySize()
@@ -88,17 +159,24 @@
 %{FLIST}    ${mb_field_parameter_list}
                                          )
     {
-        ${CLASS}${METHOD}Body bodyFrame = new ${CLASS}${METHOD}Body(major, minor);
-%{FLIST}    ${mb_field_body_initialize}
+        return createMethodBody(major, minor, getClazz(major, minor), getMethod(major, minor)
+%{FLIST}    ${mb_field_passed_parameter_list}
+        );
+    }
 
-        return bodyFrame;
+    public static ${CLASS}${METHOD}Body createMethodBody(byte major, byte minor, int clazzID, int methodID
+%{FLIST}    ${mb_field_parameter_list}
+                                         )
+    {
+        return new ${CLASS}${METHOD}Body(major, minor, clazzID, methodID
+%{FLIST}    ${mb_field_passed_parameter_list}
+        );
     }
 
     public ${CLASS}${METHOD}Body copy()
     {
-        ${CLASS}${METHOD}Body bodyFrame = new ${CLASS}${METHOD}Body(major, minor);
-%{FLIST}    ${mb_field_body_initialize}
-
-        return bodyFrame;
+        return new ${CLASS}${METHOD}Body(major, minor, getClazz(major, minor), getMethod(major, minor)
+%{FLIST}    ${mb_field_passed_parameter_list}
+        );
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodRegistryClass.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodRegistryClass.tmpl?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodRegistryClass.tmpl (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodRegistryClass.tmpl Wed Feb 14 12:02:03 2007
@@ -31,50 +31,101 @@
 import java.util.HashMap;
 import java.lang.reflect.Constructor;
 import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
 
-class MainRegistry
+public class MainRegistry
 {
+	private static final HashMap<Long, AMQMethodBodyInstanceFactory> classIDMethodIDVersionBodyMap = new HashMap<Long, AMQMethodBodyInstanceFactory>();
+
+	
     private static final Logger _log = Logger.getLogger(MainRegistry.class);
-	private static HashMap<Long, Class> classIDMethodIDVersionBodyMap = new HashMap<Long, Class>();
 	
+
+    private static final int DEFAULT_MINOR_VERSION_COUNT = 10;
+    private static final int DEFAULT_MAJOR_VERSION_COUNT = 10;
+    
+    private static VersionSpecificRegistry[][] _specificRegistries = new VersionSpecificRegistry[DEFAULT_MAJOR_VERSION_COUNT][];
+    	
     static
     {
 %{CLIST}	${reg_map_put_method}
     }
     
-    public static AMQMethodBody get(short classID, short methodID, byte major, byte minor)
+    public static AMQMethodBody get(short classID, short methodID, byte major, byte minor, ByteBuffer in, long size)
         throws AMQFrameDecodingException
     {
-		Class bodyClass = classIDMethodIDVersionBodyMap.get(
-		    createMapKey(classID, methodID, major, minor));
-		if (bodyClass == null)
-		{
-	    	throw new AMQFrameDecodingException(_log,
-	    	    "Unable to find a suitable decoder for class " + classID + " and method " +
-	    	    methodID + " in AMQP version " + major + "-" + minor + ".");
-	    }
-	    try
-	    {
-	    	Constructor initFn = bodyClass.getConstructor(byte.class, byte.class);
-	        return (AMQMethodBody) initFn.newInstance(major, minor);
-	    }
-	    catch (Exception e)
-	    {
-	    	throw new AMQFrameDecodingException(_log,
-	    	    "Unable to instantiate body class for class " + classID + " and method " +
-	    	    methodID + " in AMQP version " + major + "-" + minor + " : " + e, e);
-	    }
+		VersionSpecificRegistry registry = getVersionSpecificRegistry(major, minor);
+        AMQMethodBodyInstanceFactory bodyFactory = registry.getMethodBody(classID,methodID);
+		
+        if (bodyFactory == null)
+        {
+            throw new AMQFrameDecodingException(_log,
+                "Unable to find a suitable decoder for class " + classID + " and method " +
+                methodID + " in AMQP version " + major + "-" + minor + ".");
+        }
+        return bodyFactory.newInstance(major, minor, in, size);
+
+	    
     }
+	
+	public static VersionSpecificRegistry getVersionSpecificRegistry(byte major, byte minor)
+	{
+		try
+		{
+			return _specificRegistries[(int)major][(int)minor];
+		}
+		catch (IndexOutOfBoundsException e)
+		{
+			return null;
+		}
+		catch (NullPointerException e)
+		{
+			return null;
+		}
+		
+		
+	}
     
-    private static Long createMapKey(short classID, short methodID, byte major, byte minor)
-    {
-    	/**
-         *	Mapping of 4 components into a guaranteed unique key:
-         *  MSB                                     LSB
-         *  +----+----+----+----+----+----+-----+-----+
-         *  |    0    | classID |methodID |major|minor|
-         *  +----+----+----+----+----+----+-----+-----+
-         */
-    	return new Long(((long)classID << 32) + ((long)methodID << 16) + ((long)major << 8) + minor);
+	private static VersionSpecificRegistry addVersionSpecificRegistry(byte major, byte minor)
+	{
+		VersionSpecificRegistry[][] registries = _specificRegistries;
+		if(major >= registries.length)
+		{
+			_specificRegistries = new VersionSpecificRegistry[(int)major + 1][];
+			System.arraycopy(registries, 0, _specificRegistries, 0, registries.length);
+			registries = _specificRegistries;
+		}
+		if(registries[major] == null)
+		{
+			registries[major] = new VersionSpecificRegistry[ minor >= DEFAULT_MINOR_VERSION_COUNT ? minor + 1 : DEFAULT_MINOR_VERSION_COUNT ];
+		}
+		else if(registries[major].length <= minor)
+		{
+			VersionSpecificRegistry[] minorArray = registries[major];
+			registries[major] = new VersionSpecificRegistry[ minor + 1 ];
+			System.arraycopy(minorArray, 0, registries[major], 0, minorArray.length);
+			
+		}
+		
+		VersionSpecificRegistry newRegistry = new VersionSpecificRegistry(major,minor);
+		
+		registries[major][minor] = newRegistry;
+		
+		return newRegistry;
+	}
+		
+	private static void registerMethod(short classID, short methodID, byte major, byte minor, AMQMethodBodyInstanceFactory instanceFactory )
+	{
+		VersionSpecificRegistry registry = getVersionSpecificRegistry(major,minor);
+		if(registry == null)
+		{
+			registry = addVersionSpecificRegistry(major,minor);
+			
+		}
+		
+		registry.registerMethod(classID, methodID, instanceFactory);
+        
     }
+
+    
 }

Propchange: incubator/qpid/branches/qpid.0-9/java/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Feb 14 12:02:03 2007
@@ -2,3 +2,6 @@
 blaze.iws
 build
 intellijclasses
+qpid.iml
+qpid.ipr
+qpid.iws

Propchange: incubator/qpid/branches/qpid.0-9/java/broker/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Feb 14 12:02:03 2007
@@ -2,3 +2,5 @@
 intellijclasses
 log
 target
+qpid-broker.ipr
+qpid-broker.iws

Modified: incubator/qpid/branches/qpid.0-9/java/broker/bin/qpid-server.bat
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/bin/qpid-server.bat?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/bin/qpid-server.bat (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/bin/qpid-server.bat Wed Feb 14 12:02:03 2007
@@ -17,7 +17,7 @@
 @REM under the License.
 @REM
 
-@echo off
+echo off
 REM Script to run the Qpid Java Broker
 
 rem Guess QPID_HOME if not defined
@@ -60,9 +60,11 @@
 shift
 goto loop
 
+rem QPID_OPTS intended to hold any -D props for use
+rem user must enclose any value for QPID_OPTS in double quotes
 :runCommand
 set LAUNCH_JAR=%QPID_HOME%\lib\qpid-incubating.jar
 set MODULE_JARS=%QPID_MODULE_JARS%
-"%JAVA_HOME%\bin\java" -server -Xmx1024m -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
+"%JAVA_HOME%\bin\java" -server -Xmx1024m %QPID_OPTS% -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
 
 :end

Propchange: incubator/qpid/branches/qpid.0-9/java/broker/distribution/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Feb 14 12:02:03 2007
@@ -0,0 +1 @@
+target

Modified: incubator/qpid/branches/qpid.0-9/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/etc/config.xml?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/etc/config.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/etc/config.xml Wed Feb 14 12:02:03 2007
@@ -79,6 +79,37 @@
             </mechanisms>
         </sasl>
     </security>
+	<virtualhosts>
+		<virtualhost>
+			<name>localhost</name>
+			<localhost>
+			    <store>
+					<!-- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>  -->
+					<class>org.apache.qpid.server.store.MemoryMessageStore</class>
+					<environment-path>localhost-store</environment-path>
+			    </store>		
+			</localhost>
+		</virtualhost>
+		
+		<virtualhost>
+			<name>development</name>
+			<development>
+			    <store>
+					<class>org.apache.qpid.server.store.MemoryMessageStore</class>
+			    </store>			
+			</development>
+		</virtualhost>
+		
+		<virtualhost>
+			<name>test</name>
+			<test>
+			    <store>
+					<class>org.apache.qpid.server.store.MemoryMessageStore</class>
+			    </store>			
+			</test>
+		</virtualhost>
+		
+	</virtualhosts>
     <heartbeat>
         <delay>0</delay>
         <timeoutFactor>2.0</timeoutFactor>
@@ -86,8 +117,6 @@
     <queue>
         <auto_register>true</auto_register>
     </queue>
-    <store>
-        <class>org.apache.qpid.server.store.MemoryMessageStore</class>
-    </store>
+
     <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
 </broker>

Modified: incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml Wed Feb 14 12:02:03 2007
@@ -8,9 +8,9 @@
  - to you under the Apache License, Version 2.0 (the
  - "License"); you may not use this file except in compliance
  - with the License.  You may obtain a copy of the License at
- - 
+ -
  -   http://www.apache.org/licenses/LICENSE-2.0
- - 
+ -
  - Unless required by applicable law or agreed to in writing,
  - software distributed under the License is distributed on an
  - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,14 +36,27 @@
             <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
         </layout>
     </appender>
+    
+    <!--
+    After merge with trunk, there seems to be considerable confusion over
+    this section, so all have been commented out for now...
+    -->
 
-    <category name="org.apache.qpid.framing">
+    <!--<category name="org.apache.qpid.framing">
         <priority value="debug"/>
     </category>
 
-    <category name="org.apache.qpid.server.handler">
+    <category name="org.apache.qpid.server.queue">
         <priority value="debug"/>
     </category>
+
+    <category name="org.apache.qpid.server.txn">
+        <priority value="debug"/>
+    </category>
+
+    <category name="org.apache.qpid.server.handler">
+        <priority value="debug"/>
+    </category>-->
 
     <root>
         <priority value="debug"/>

Modified: incubator/qpid/branches/qpid.0-9/java/broker/etc/virtualhosts.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/etc/virtualhosts.xml?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/etc/virtualhosts.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/etc/virtualhosts.xml Wed Feb 14 12:02:03 2007
@@ -21,7 +21,17 @@
  -->
 <virtualhosts>
     <virtualhost>
-        <path>/development</path>
+        <path>localhost</path>
+        <bind>direct://amq.direct//queue</bind>
+        <bind>direct://amq.direct//ping</bind>
+    </virtualhost>
+	<virtualhost>
+        <path>development</path>
+        <bind>direct://amq.direct//queue</bind>
+        <bind>direct://amq.direct//ping</bind>
+    </virtualhost>
+		<virtualhost>
+        <path>test</path>
         <bind>direct://amq.direct//queue</bind>
         <bind>direct://amq.direct//ping</bind>
     </virtualhost>

Modified: incubator/qpid/branches/qpid.0-9/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/pom.xml?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/pom.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/pom.xml Wed Feb 14 12:02:03 2007
@@ -37,53 +37,44 @@
     </properties>
 
     <dependencies>
+
         <dependency>
             <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-common</artifactId>
         </dependency>
+
         <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
         </dependency>
+
         <dependency>
             <groupId>commons-configuration</groupId>
             <artifactId>commons-configuration</artifactId>
         </dependency>
+
         <dependency>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.geronimo.specs</groupId>
-            <artifactId>geronimo-jms_1.1_spec</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.mina</groupId>
-            <artifactId>mina-filter-ssl</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.mina</groupId>
-            <artifactId>mina-java5</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>backport-util-concurrent</groupId>
-            <artifactId>backport-util-concurrent</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-simple</artifactId>
-        </dependency>
+
+	<!-- Test Dependencies -->
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
+            <scope>test</scope>
         </dependency>
+
         <dependency>
             <groupId>org.easymock</groupId>
             <artifactId>easymockclassextension</artifactId>
+            <scope>test</scope>
         </dependency>
+
     </dependencies>
 
     <build>
+
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
@@ -128,7 +119,32 @@
                     </systemProperties>
                 </configuration>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
+
+        <testResources>
+            <testResource>
+                <targetPath>src/</targetPath>
+                <filtering>false</filtering>
+                <directory>src/test/java</directory>
+                <includes>
+                    <include>**/*.java</include>
+                </includes>
+            </testResource>
+        </testResources>
+
     </build>
 
 </project>

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/grammar/SelectorParser.jj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/grammar/SelectorParser.jj?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/grammar/SelectorParser.jj (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/grammar/SelectorParser.jj Wed Feb 14 12:02:03 2007
@@ -66,8 +66,9 @@
 import java.io.*;
 import java.util.*;
 
-import javax.jms.InvalidSelectorException;
+import org.apache.qpid.AMQInvalidSelectorException;
 
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.filter.*;
 
 /**
@@ -81,14 +82,14 @@
         this(new StringReader(""));
     }
 
-    public BooleanExpression parse(String sql) throws InvalidSelectorException {
+    public BooleanExpression parse(String sql) throws AMQInvalidSelectorException {
         this.ReInit(new StringReader(sql));
 
         try {
             return this.JmsSelector();
         }
         catch (Throwable e) {
-	        throw (InvalidSelectorException)new InvalidSelectorException(sql).initCause(e);
+	        throw (AMQInvalidSelectorException)new AMQInvalidSelectorException(sql).initCause(e);
         }
 
     }
@@ -589,7 +590,7 @@
     (
         t = <ID>
         {
-            left = new PropertyExpression(t.image);
+            left = new PropertyExpression(new AMQShortString(t.image));
         }
     )
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Wed Feb 14 12:02:03 2007
@@ -30,7 +30,9 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -49,20 +51,28 @@
     private final ExchangeFactory _exchangeFactory;
     private final MessageStore _messageStore;
 
+    private final VirtualHost.VirtualHostMBean _virtualHostMBean;
+
     @MBeanConstructor("Creates the Broker Manager MBean")
-    public AMQBrokerManagerMBean() throws JMException
+    public AMQBrokerManagerMBean(VirtualHost.VirtualHostMBean virtualHostMBean) throws JMException
     {
         super(ManagedBroker.class, ManagedBroker.TYPE);
-        IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
-        _queueRegistry = appRegistry.getQueueRegistry();
-        _exchangeRegistry = appRegistry.getExchangeRegistry();
-        _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
-        _messageStore = ApplicationRegistry.getInstance().getMessageStore();
+
+        _virtualHostMBean = virtualHostMBean;
+        VirtualHost virtualHost = virtualHostMBean.getVirtualHost();
+
+
+
+
+        _queueRegistry = virtualHost.getQueueRegistry();
+        _exchangeRegistry = virtualHost.getExchangeRegistry();
+        _messageStore = virtualHost.getMessageStore();
+        _exchangeFactory = virtualHost.getExchangeFactory();
     }
 
     public String getObjectInstanceName()
     {
-        return this.getClass().getName();
+        return _virtualHostMBean.getVirtualHost().getName();
     }
 
     /**
@@ -81,10 +91,10 @@
         {
             synchronized (_exchangeRegistry)
             {
-                Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
+                Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName));
                 if (exchange == null)
                 {
-                    exchange = _exchangeFactory.createExchange(exchangeName, type, durable, autoDelete, 0);
+                    exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0);
                     _exchangeRegistry.registerExchange(exchange);
                 }
                 else
@@ -114,7 +124,7 @@
         // when there are no bindings.
         try
         {
-            _exchangeRegistry.unregisterExchange(exchangeName, false);
+            _exchangeRegistry.unregisterExchange(new AMQShortString(exchangeName), false);
         }
         catch (AMQException ex)
         {
@@ -135,7 +145,7 @@
     public void createNewQueue(String queueName, boolean durable, String owner, boolean autoDelete)
             throws JMException
     {
-        AMQQueue queue = _queueRegistry.getQueue(queueName);
+        AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
         if (queue != null)
         {
             throw new JMException("The queue \"" + queueName + "\" already exists.");
@@ -143,7 +153,7 @@
 
         try
         {
-            queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry);
+            queue = new AMQQueue(new AMQShortString(queueName), durable, new AMQShortString(owner), autoDelete, getVirtualHost());
             if (queue.isDurable() && !queue.isAutoDelete())
             {
                 _messageStore.createQueue(queue);
@@ -156,6 +166,11 @@
         }
     }
 
+    private VirtualHost getVirtualHost()
+    {
+        return _virtualHostMBean.getVirtualHost();
+    }
+
     /**
      * Deletes the queue from queue registry and persistant storage.
      *
@@ -164,7 +179,7 @@
      */
     public void deleteQueue(String queueName) throws JMException
     {
-        AMQQueue queue = _queueRegistry.getQueue(queueName);
+        AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
         if (queue == null)
         {
             throw new JMException("The Queue " + queueName + " is not a registerd queue.");
@@ -173,7 +188,7 @@
         try
         {
             queue.delete();
-            _messageStore.removeQueue(queueName);
+            _messageStore.removeQueue(new AMQShortString(queueName));
 
         }
         catch (AMQException ex)
@@ -182,11 +197,14 @@
         }
     }
 
-    public ObjectName getObjectName() throws MalformedObjectNameException
+    public ManagedObject getParentObject()
     {
-        StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
-        objectName.append(":type=").append(getType());
+        return _virtualHostMBean;
+    }
 
-        return new ObjectName(objectName.toString());
+    // This will have a single instance for a virtual host, so not having the name property in the ObjectName
+    public ObjectName getObjectName() throws MalformedObjectNameException
+    {        
+        return getObjectNameForSingleInstanceMBean();
     }
 } // End of MBean class

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Feb 14 12:02:03 2007
@@ -27,7 +27,7 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.Content;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MessageAppendBody;
@@ -43,14 +43,21 @@
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
 import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.exchange.NoRouteException;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.AMQReference;
+import org.apache.qpid.server.queue.MessageHandleFactory;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.txn.TxnBuffer;
-import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.LocalTransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.mina.common.ByteBuffer;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -59,6 +66,7 @@
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.Set;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -71,7 +79,7 @@
 
     private final int _channelId;
 
-    private boolean _transactional;
+    //private boolean _transactional;
 
     private long _prefetch_HighWaterMark;
 
@@ -81,6 +89,8 @@
     private ResponseManager _responseManager;
     private AMQProtocolSession _session;
 
+    private long _prefetchSize;
+
     /**
      * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
      * value of this represents the <b>last</b> tag sent out
@@ -102,30 +112,33 @@
     /**
      * The set of open references on this channel.
      */
-    private Map<String, Reference> _references = new LinkedHashMap();
+    private Map<String, AMQReference> _references = new LinkedHashMap();
 
     /**
      * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
      */
-    private final Map<String, AMQQueue> _consumerTag2QueueMap = new TreeMap<String, AMQQueue>();
+    private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>();
 
     private final MessageStore _messageStore;
 
-    private final Object _unacknowledgedMessageMapLock = new Object();
-
-    private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
-
-    private long _lastDeliveryTag;
+    private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
 
     private final AtomicBoolean _suspended = new AtomicBoolean(false);
 
     private final MessageRouter _exchanges;
 
-    private final TxnBuffer _txnBuffer;
+    private TransactionalContext _txnContext;
+
+    /**
+     * A context used by the message store enabling it to track context for a given channel even across
+     * thread boundaries
+     */
+    private final StoreContext _storeContext = new StoreContext();
+
+    private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
 
-    private TxAck ackOp;
+    private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
 
-    private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
     private Set<Long> _browsedAcks = new HashSet<Long>();
 
     // XXX: clean up arguments
@@ -139,22 +152,29 @@
         _exchanges = exchanges;
         _requestManager = new RequestManager(_session.getConnectionId(), channelId, _session, true);
         _responseManager = new ResponseManager(_session.getConnectionId(), channelId, methodListener, _session, true);
-        _txnBuffer = new TxnBuffer(_messageStore);
+        // by default the session is non-transactional
+        _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
     }
 
-    public int getChannelId()
+    /**
+     * Sets this channel to be part of a local transaction
+     */
+    public void setLocalTransactional()
     {
-        return _channelId;
+        _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
     }
 
     public boolean isTransactional()
     {
-        return _transactional;
+        // this does not look great but there should only be one "non-transactional"
+        // transactional context, while there could be several transactional ones in
+        // theory
+        return !(_txnContext instanceof NonTransactionalContext);
     }
 
-    public void setTransactional(boolean transactional)
+    public int getChannelId()
     {
-        _transactional = transactional;
+        return _channelId;
     }
 
     public long getPrefetchCount()
@@ -167,6 +187,17 @@
         _prefetch_HighWaterMark = prefetchCount;
     }
 
+    public long getPrefetchSize()
+    {
+        return _prefetchSize;
+    }
+
+
+    public void setPrefetchSize(long prefetchSize)
+    {
+        _prefetchSize = prefetchSize;
+    }
+
     public long getPrefetchLowMarkCount()
     {
         return _prefetch_LowWaterMark;
@@ -193,80 +224,165 @@
         AMQMessage message;
         switch (body.getContentType()) {
         case INLINE_T:
-            message = new AMQMessage(_messageStore, transferBody,
-                                                Collections.singletonList(body.getContent()));
+            message = new AMQMessage(_messageStore, transferBody, Collections.singletonList(body.getContent()), _txnContext);
             message.setPublisher(publisher);
-            route(message);
+            routeCurrentMessage(message);
+            message.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
             break;
         case REF_T:
-            Reference ref = getReference(body.getContentAsByteArray());
-            message = new AMQMessage(_messageStore, transferBody, ref.contents);
+            AMQReference ref = getReference(body.getContentAsByteArray());
+            message = new AMQMessage(_messageStore, transferBody, ref.getContentList(), _txnContext);
             message.setPublisher(publisher);
-            ref.messages.add(message);
+            ref.addRefTransferBody(message);
             break;
         }
     }
 
-    private static String key(byte[] id) {
+    private static String key(byte[] id)
+    {
         return new String(id);
     }
 
-    private Reference getReference(byte[] id) {
+    private AMQReference getReference(byte[] id)
+    {
         String key = key(id);
-        Reference ref = _references.get(key);
-        if (ref == null) {
+        AMQReference ref = _references.get(key);
+        if (ref == null)
+        {
             throw new IllegalArgumentException(key);
         }
         return ref;
     }
 
-    private Reference createReference(byte[] id) {
+    private AMQReference createReference(byte[] id)
+    {
         String key = key(id);
-        if (_references.containsKey(key)) {
+        if (_references.containsKey(key))
+        {
             throw new IllegalArgumentException(key);
         }
-        Reference ref = new Reference();
+        AMQReference ref = new AMQReference(id);
         _references.put(key, ref);
         return ref;
     }
 
-    private Reference removeReference(byte[] id) {
+    private AMQReference removeReference(byte[] id)
+    {
         String key = key(id);
-        Reference ref = _references.remove(key);
-        if (ref == null) {
+        AMQReference ref = _references.remove(key);
+        if (ref == null)
+        {
             throw new IllegalArgumentException(key);
         }
         return ref;
     }
 
-    public void addMessageOpen(MessageOpenBody open) {
+    public void addMessageOpen(MessageOpenBody open)
+    {
         createReference(open.reference);
     }
 
-    public void addMessageAppend(MessageAppendBody append) {
-        Reference ref = getReference(append.reference);
-        ref.contents.add(ByteBuffer.wrap(append.bytes));
+    public void addMessageAppend(MessageAppendBody append)
+    {
+        AMQReference ref = getReference(append.reference);
+        ref.appendContent(ByteBuffer.wrap(append.bytes));
     }
 
-    public void addMessageClose(MessageCloseBody close) throws AMQException {
-        Reference ref = removeReference(close.reference);
-        for (AMQMessage msg : ref.messages) {
-            route(msg);
+    public void addMessageClose(MessageCloseBody close) throws AMQException
+    {
+        AMQReference ref = removeReference(close.reference);
+        for (AMQMessage msg : ref.getMessageList())
+        {
+            routeCurrentMessage(msg);
+            msg.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
         }
     }
 
-    public void deliver(AMQMessage msg, String destination, final long deliveryTag) {
-        deliver(msg, destination, new AMQMethodListener() {
-            public boolean methodReceived(AMQMethodEvent evt) throws AMQException {
+    protected void routeCurrentMessage(AMQMessage msg) throws AMQException
+    {
+        try
+        {
+            _exchanges.routeContent(msg);
+        }
+        catch (NoRouteException e)
+        {
+            _returnMessages.add(e);
+        }
+    }
+// 
+//     public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag)
+//     {
+//         deliver(msg, destination, new AMQMethodListener()
+//         {
+//             public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+//             {
+//                 AMQMethodBody method = evt.getMethod();
+//                 if (_log.isDebugEnabled())
+//                 {
+//                     _log.debug(method + " received on channel " + _channelId);
+//                 }
+//                 // XXX: multiple?
+//                 if (method instanceof MessageOkBody)
+//                 {
+//                     acknowledgeMessage(deliveryTag, false);
+//                     return true;
+//                 }
+//                 else
+//                 {
+//                     // TODO: implement reject
+//                     return false;
+//                 }
+//             }
+//             public void error(Exception e) {}
+//         });
+//     }
+
+    public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag)
+    {
+        // Do we need to refactor the content for a different frame size?
+        long maxFrameSize = _session.getFrameMax();
+        Iterable<ByteBuffer> contentItr = msg.getContents();
+        if (msg.getSize() > maxFrameSize)
+        {
+            Iterator<ByteBuffer> cItr = contentItr.iterator();
+            if (cItr.next().limit() > maxFrameSize) // First chunk should equal incoming frame size
+            {
+                // TODO - Refactor the chunks for smaller outbound frame size
+                throw new Error("XXX TODO - need to refactor content chunks here");
+                // deliverRef(msg, destination, deliveryTag);
+            }
+            else
+            {
+                // Use ref content as is - no need to refactor
+                deliverRef(msg, destination, deliveryTag);
+            }
+        }
+        else
+        {
+            // Concatenate - all incoming chunks will fit into single outbound frame
+            deliverInline(msg, destination, deliveryTag);
+        }
+    }
+    
+    public void deliverInline(AMQMessage msg, AMQShortString destination, final long deliveryTag)
+    {
+        deliverInline(msg, destination, new AMQMethodListener()
+        {
+            public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+            {
                 AMQMethodBody method = evt.getMethod();
-                if (_log.isDebugEnabled()) {
+                if (_log.isDebugEnabled())
+                {
                     _log.debug(method + " received on channel " + _channelId);
                 }
                 // XXX: multiple?
-                if (method instanceof MessageOkBody) {
+                if (method instanceof MessageOkBody)
+                {
                     acknowledgeMessage(deliveryTag, false);
                     return true;
-                } else {
+                }
+                else
+                {
                     // TODO: implement reject
                     return false;
                 }
@@ -275,70 +391,127 @@
         });
     }
 
-    public void deliver(AMQMessage msg, String destination, AMQMethodListener listener) {
-        // XXX: should reframe if necessary to conform to max frame size
+    public void deliverInline(AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
+    {
         MessageTransferBody mtb = msg.getTransferBody().copy();
         mtb.destination = destination;
         ByteBuffer buf = ByteBuffer.allocate((int)msg.getBodySize());
-        for (ByteBuffer bb : msg.getContents()) {
+        for (ByteBuffer bb : msg.getContents())
+        {
             buf.put(bb);
         }
         buf.flip();
         mtb.body = new Content(Content.TypeEnum.INLINE_T, buf);
         _session.writeRequest(_channelId, mtb, listener);
     }
-
-    protected void route(AMQMessage msg) throws AMQException
+    
+    public void deliverRef(final AMQMessage msg, final AMQShortString destination, final long deliveryTag)
     {
-        if (_transactional)
+        final byte[] refId = String.valueOf(System.currentTimeMillis()).getBytes();
+        AMQMethodBody openBody = MessageOpenBody.createMethodBody(
+            _session.getProtocolMajorVersion(), // AMQP major version
+            _session.getProtocolMinorVersion(), // AMQP minor version
+            refId);
+        _session.writeRequest(_channelId, openBody, new AMQMethodListener()
         {
-            //don't create a transaction unless needed
-            if (msg.isPersistent())
+            public boolean methodReceived(AMQMethodEvent evt) throws AMQException
             {
-                _txnBuffer.containsPersistentChanges();
-            }
-
-            //A publication will result in the enlisting of several
-            //TxnOps. The first is an op that will store the message.
-            //Following that (and ordering is important), an op will
-            //be added for every queue onto which the message is
-            //enqueued. Finally a cleanup op will be added to decrement
-            //the reference associated with the routing.
-            Store storeOp = new Store(msg);
-            _txnBuffer.enlist(storeOp);
-            msg.setTxnBuffer(_txnBuffer);
-            try
-            {
-                _exchanges.routeContent(msg);
-                _txnBuffer.enlist(new Cleanup(msg));
-            }
-            catch (RequiredDeliveryException e)
-            {
-                //Can only be due to the mandatory flag, as no attempt
-                //has yet been made to deliver the message. The
-                //message will thus not have been delivered to any
-                //queue so we can return the message (without killing
-                //the transaction) and for efficiency remove the store
-                //operation from the buffer.
-                _txnBuffer.cancel(storeOp);
-                throw e;
-            }
-        }
-        else
-        {
-            try
-            {
-                _exchanges.routeContent(msg);
-                //following check implements the functionality
-                //required by the 'immediate' flag:
-                msg.checkDeliveredToConsumer();
-            }
-            finally
-            {
-                msg.decrementReference();
+                AMQMethodBody method = evt.getMethod();
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug(method + " received on channel " + _channelId);
+                }
+                if (method instanceof MessageOkBody)
+                {
+                    acknowledgeMessage(deliveryTag, false);
+                    deliverRef(refId, msg, destination, _session.getStateManager());
+                    return true;
+                }
+                else
+                {
+                    // TODO: implement reject
+                    return false;
+                }
             }
-        }
+            public void error(Exception e) {}
+        });
     }
+    
+    public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
+    {
+        MessageTransferBody mtb = msg.getTransferBody().copy();
+        mtb.destination = destination;
+        mtb.body = new Content(Content.TypeEnum.REF_T, refId);
+        _session.writeRequest(_channelId, mtb, listener);
+        for (ByteBuffer bb : msg.getContents())
+        {
+            ByteBuffer dup = bb.duplicate();
+            byte[] ba = new byte[dup.limit()];
+            dup.get(ba);
+        	AMQMethodBody appendBody = MessageAppendBody.createMethodBody(
+                _session.getProtocolMajorVersion(), // AMQP major version
+                _session.getProtocolMinorVersion(), // AMQP minor version
+                ba,
+                refId);
+            _session.writeRequest(_channelId, appendBody, listener);
+        }
+        AMQMethodBody closeBody = MessageCloseBody.createMethodBody(
+            _session.getProtocolMajorVersion(), // AMQP major version
+            _session.getProtocolMinorVersion(), // AMQP minor version
+            refId);
+    }
+
+//     protected void route(AMQMessage msg) throws AMQException
+//     {
+//         if (isTransactional())
+//         {
+//             //don't create a transaction unless needed
+//             if (msg.isPersistent())
+//             {
+// //                _txnBuffer.containsPersistentChanges();
+//             }
+// 
+//             //A publication will result in the enlisting of several
+//             //TxnOps. The first is an op that will store the message.
+//             //Following that (and ordering is important), an op will
+//             //be added for every queue onto which the message is
+//             //enqueued. Finally a cleanup op will be added to decrement
+//             //the reference associated with the routing.
+// //             Store storeOp = new Store(msg);
+// //             _txnBuffer.enlist(storeOp);
+// //             msg.setTxnBuffer(_txnBuffer);
+//             try
+//             {
+//                 _exchanges.routeContent(msg);
+// //                 _txnBuffer.enlist(new Cleanup(msg));
+//             }
+//             catch (RequiredDeliveryException e)
+//             {
+//                 //Can only be due to the mandatory flag, as no attempt
+//                 //has yet been made to deliver the message. The
+//                 //message will thus not have been delivered to any
+//                 //queue so we can return the message (without killing
+//                 //the transaction) and for efficiency remove the store
+//                 //operation from the buffer.
+// //                 _txnBuffer.cancel(storeOp);
+//                 throw e;
+//             }
+//         }
+//         else
+//         {
+//             try
+//             {
+//                 _exchanges.routeContent(msg);
+//                 //following check implements the functionality
+//                 //required by the 'immediate' flag:
+//                 msg.checkDeliveredToConsumer();
+//             }
+//             finally
+//             {
+//                 msg.decrementReference(_storeContext);
+//             }
+//         }
+//     }
 
     public RequestManager getRequestManager()
     {
@@ -369,17 +542,19 @@
      * @param queue   the queue to subscribe to
      * @param session the protocol session of the subscriber
      * @param noLocal
+     * @param exclusive
      * @return the consumer tag. This is returned to the subscriber and used in
      *         subsequent unsubscribe requests
      * @throws ConsumerTagNotUniqueException if the tag is not unique
      * @throws AMQException                  if something goes wrong
      */
-    public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
-                                   FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
+    public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue,
+                        AMQProtocolSession session, boolean acks, FieldTable filters,
+                        boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
     {
         if (tag == null)
         {
-            tag = "sgen_" + getNextConsumerTag();
+            tag = new AMQShortString("sgen_" + getNextConsumerTag());
         }
         if (_consumerTag2QueueMap.containsKey(tag))
         {
@@ -392,7 +567,7 @@
     }
 
 
-    public void unsubscribeConsumer(AMQProtocolSession session, String consumerTag) throws AMQException
+    public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
     {
         AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
         if (q != null)
@@ -408,21 +583,16 @@
      */
     public void close(AMQProtocolSession session) throws AMQException
     {
-        if (_transactional)
-        {
-            synchronized(_txnBuffer)
-            {
-                _txnBuffer.rollback();//releases messages
-            }
-        }
+        _txnContext.rollback();
         unsubscribeAllConsumers(session);
         requeue();
+		_txnContext.commit();
     }
 
     private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
     {
         _log.info("Unsubscribing all consumers on channel " + toString());
-        for (Map.Entry<String, AMQQueue> me : _consumerTag2QueueMap.entrySet())
+        for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet())
         {
             me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
         }
@@ -432,16 +602,16 @@
     /**
      * Add a message to the channel-based list of unacknowledged messages
      *
-     * @param message
-     * @param deliveryTag
-     * @param queue
+     * @param message     the message that was delivered
+     * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of
+     *                    the delivery tag)
+     * @param queue       the queue from which the message was delivered
      */
-    public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
+    public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
     {
-        synchronized(_unacknowledgedMessageMapLock)
+        synchronized (_unacknowledgedMessageMap.getLock())
         {
-            _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
-            _lastDeliveryTag = deliveryTag;
+            _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
             checkSuspension();
         }
     }
@@ -449,24 +619,19 @@
     /**
      * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
      * May result in delivery to this same channel or to other subscribers.
+     *
+     * @throws org.apache.qpid.AMQException if the requeue fails
      */
     public void requeue() throws AMQException
     {
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
-        Map<Long, UnacknowledgedMessage> currentList;
-        synchronized(_unacknowledgedMessageMapLock)
-        {
-            currentList = _unacknowledgedMessageMap;
-            _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
-        }
+        Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
 
-        for (UnacknowledgedMessage unacked : currentList.values())
+        for (UnacknowledgedMessage unacked : messagesToBeDelivered)
         {
             if (unacked.queue != null)
             {
-                unacked.message.setTxnBuffer(null);
-
-                unacked.queue.deliver(unacked.message);
+                _txnContext.deliver(unacked.message, unacked.queue);
             }
         }
     }
@@ -474,53 +639,87 @@
     /**
      * Called to resend all outstanding unacknowledged messages to this same channel.
      */
-    public void resend(AMQProtocolSession session)
-    {
-        //messages go to this channel
-        synchronized(_unacknowledgedMessageMapLock)
-        {
-            for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet())
-            {
-                long deliveryTag = entry.getKey();
-                String consumerTag = entry.getValue().consumerTag;
-                AMQMessage msg = entry.getValue().message;
-                msg.setRedelivered(true);
-                deliver(msg, consumerTag, deliveryTag);
-            }
-        }
-    }
+     public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
+     {
+        throw new Error("XXX");
+//         final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
+// 
+//         _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+//         {
+//             public boolean callback(UnacknowledgedMessage message) throws AMQException
+//             {
+//                 long deliveryTag = message.deliveryTag;
+//                 AMQShortString consumerTag = message.consumerTag;
+//                 AMQMessage msg = message.message;
+//                 msg.setRedelivered(true);
+//                 // working
+// //                deliver(msg, consumerTag, deliveryTag);
+//                 // trunk
+//                 if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
+//                 {
+//                     msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+//                 }
+//                 else
+//                 {
+//                     // Message has no consumer tag, so was "delivered" to a GET
+//                     // or consumer no longer registered
+//                     // cannot resend, so re-queue.
+//                     if (message.queue != null && (consumerTag == null || requeue))
+//                     {
+//                         msgToRequeue.add(message);                         
+//                     }
+//                 }
+//                 // false means continue processing
+//                 return false;
+//             }
+// 
+//             public void visitComplete()
+//             {
+//             }
+//         });
+// 
+//         for(UnacknowledgedMessage message : msgToRequeue)
+//         {
+//             _txnContext.deliver(message.message, message.queue);
+//             _unacknowledgedMessageMap.remove(message.deliveryTag);
+//         }
+     }
 
     /**
      * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged
      * messages to remove the queue reference and also decrement any message reference counts, without
-     * actually removing the item sine we may get an ack for a delivery tag that was generated from the
+     * actually removing the item since we may get an ack for a delivery tag that was generated from the
      * deleted queue.
      *
-     * @param queue
+     * @param queue the queue that has been deleted
+     * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
      */
-    public void queueDeleted(AMQQueue queue)
+    public void queueDeleted(final AMQQueue queue) throws AMQException
     {
-        synchronized(_unacknowledgedMessageMapLock)
+        _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
         {
-            for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet())
+            public boolean callback(UnacknowledgedMessage message) throws AMQException
             {
-                final UnacknowledgedMessage unackedMsg = unacked.getValue();
-                // we can compare the reference safely in this case
-                if (unackedMsg.queue == queue)
+                if (message.queue == queue)
                 {
-                    unackedMsg.queue = null;
                     try
                     {
-                        unackedMsg.message.decrementReference();
+                        message.discard(_storeContext);
+                        message.queue = null;
                     }
                     catch (AMQException e)
                     {
-                        _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " +
+                        _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " +
                                    e, e);
                     }
                 }
+                return false;
             }
-        }
+
+            public void visitComplete()
+            {
+            }
+        });
     }
 
     /**
@@ -533,151 +732,11 @@
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
     {
-        if (_transactional)
-        {
-            //check that the tag exists to give early failure
-            if (!multiple || deliveryTag > 0)
-            {
-                checkAck(deliveryTag);
-            }
-            //we use a single txn op for all acks and update this op
-            //as new acks come in. If this is the first ack in the txn
-            //we will need to create and enlist the op.
-            if (ackOp == null)
-            {
-                ackOp = new TxAck(new AckMap());
-                _txnBuffer.enlist(ackOp);
-            }
-            //update the op to include this ack request
-            if (multiple && deliveryTag == 0)
-            {
-                synchronized(_unacknowledgedMessageMapLock)
-                {
-                    //if have signalled to ack all, that refers only
-                    //to all at this time
-                    ackOp.update(_lastDeliveryTag, multiple);
-                }
-            }
-            else
-            {
-                ackOp.update(deliveryTag, multiple);
-            }
-        }
-        else
-        {
-            handleAcknowledgement(deliveryTag, multiple);
-        }
-    }
-
-    private void checkAck(long deliveryTag) throws AMQException
-    {
-        synchronized(_unacknowledgedMessageMapLock)
-        {
-            if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
-            {
-                throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
-            }
-        }
-    }
-
-    private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
-    {
-        if (_log.isDebugEnabled())
+        synchronized (_unacknowledgedMessageMap.getLock())
         {
-            _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag +
-                       " and multiple " + multiple);
-        }
-        if (multiple)
-        {
-            LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
-            synchronized(_unacknowledgedMessageMapLock)
-            {
-                if (deliveryTag == 0)
-                {
-                    //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages.
-                    _log.trace("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size());
-                    acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values());
-                    _unacknowledgedMessageMap.clear();
-                }
-                else
-                {
-                    if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
-                    {
-                        throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
-                    }
-                    Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator();
-
-                    while (i.hasNext())
-                    {
-
-                        Map.Entry<Long, UnacknowledgedMessage> unacked = i.next();
-
-                        if (unacked.getKey() > deliveryTag)
-                        {
-                            //This should not occur now.
-                            throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString());
-                        }
-
-                        i.remove();
-
-                        acked.add(unacked.getValue());
-                        if (unacked.getKey() == deliveryTag)
-                        {
-                            break;
-                        }
-                    }
-                }
-            }// synchronized
-
-            if (_log.isTraceEnabled())
-            {
-                _log.trace("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
-                           acked.size() + " items.");
-            }
-
-            for (UnacknowledgedMessage msg : acked)
-            {
-                if (!_browsedAcks.contains(deliveryTag))
-                {
-                    msg.discard();
-                }
-                else
-                {
-                    _browsedAcks.remove(deliveryTag);
-                }
-            }
-
-        }
-        else
-        {
-            UnacknowledgedMessage msg;
-            synchronized(_unacknowledgedMessageMapLock)
-            {
-                msg = _unacknowledgedMessageMap.remove(deliveryTag);
-            }
-
-            if (msg == null)
-            {
-                _log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
-                throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
-            }
-
-            if (!_browsedAcks.contains(deliveryTag))
-            {
-                msg.discard();
-            }
-            else
-            {
-                _browsedAcks.remove(deliveryTag);
-            }
-
-            if (_log.isTraceEnabled())
-            {
-                _log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
-            }
+            _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
+            checkSuspension();
         }
-
-        checkSuspension();
     }
 
     /**
@@ -685,19 +744,24 @@
      *
      * @return the map of unacknowledged messages
      */
-    public Map<Long, UnacknowledgedMessage> getUnacknowledgedMessageMap()
+    public UnacknowledgedMessageMap getUnacknowledgedMessageMap()
     {
         return _unacknowledgedMessageMap;
     }
 
+    public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
+    {
+        _browsedAcks.add(deliveryTag);
+        addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+    }
+
     private void checkSuspension()
     {
         boolean suspend;
-        //noinspection SynchronizeOnNonFinalField
-        synchronized(_unacknowledgedMessageMapLock)
-        {
-            suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
-        }
+        
+        suspend = ((_prefetch_HighWaterMark != 0) &&  _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
+                 || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+        
         setSuspended(suspend);
     }
 
@@ -707,11 +771,8 @@
 
         if (isSuspended && !suspended)
         {
-            synchronized(_unacknowledgedMessageMapLock)
-            {
-                // Continue being suspended if we are above the _prefetch_LowWaterMark
-                suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
-            }
+            // Continue being suspended if we are above the _prefetch_LowWaterMark
+            suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
         }
 
         boolean wasSuspended = _suspended.getAndSet(suspended);
@@ -740,33 +801,22 @@
 
     public void commit() throws AMQException
     {
-        if (ackOp != null)
+        if (!isTransactional())
         {
-            ackOp.consolidate();
-            if (ackOp.checkPersistent())
-            {
-                _txnBuffer.containsPersistentChanges();
-            }
-            ackOp = null;//already enlisted, after commit will reset regardless of outcome
+            throw new AMQException("Fatal error: commit called on non-transactional channel");
         }
-
-        _txnBuffer.commit();
-        //TODO: may need to return 'immediate' messages at this point
+        _txnContext.commit();
     }
 
     public void rollback() throws AMQException
     {
-        //need to protect rollback and close from each other...
-        synchronized(_txnBuffer)
-        {
-            _txnBuffer.rollback();
-        }
+        _txnContext.rollback();
     }
 
     public String toString()
     {
         StringBuilder sb = new StringBuilder(30);
-        sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional);
+        sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional());
         sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark);
         sb.append("/").append(_prefetch_HighWaterMark);
         return sb.toString();
@@ -782,129 +832,46 @@
         return _defaultQueue;
     }
 
-    public void processReturns(AMQProtocolSession session)
-    {
-        for (AMQDataBlock block : _returns)
-        {
-            session.writeFrame(block);
-        }
-        _returns.clear();
-    }
-
-    public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue)
+    public StoreContext getStoreContext()
     {
-        _browsedAcks.add(deliveryTag);
-        addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+        return _storeContext;
     }
 
-    //we use this wrapper to ensure we are always using the correct
-    //map instance (its not final unfortunately)
-    private class AckMap implements UnacknowledgedMessageMap
+    public void processReturns(AMQProtocolSession session) throws AMQException
     {
-        public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
+        for (RequiredDeliveryException bouncedMessage : _returnMessages)
         {
-            impl().collect(deliveryTag, multiple, msgs);
-        }
-
-        public void remove(List<UnacknowledgedMessage> msgs)
-        {
-            impl().remove(msgs);
-        }
-
-        private UnacknowledgedMessageMap impl()
-        {
-            return new UnacknowledgedMessageMapImpl(_unacknowledgedMessageMapLock, _unacknowledgedMessageMap);
+            AMQMessage message = bouncedMessage.getAMQMessage();
+            session.writeResponse(_channelId, message.getMessageId(), message.getTransferBody());
+//            message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), new AMQShortString(bouncedMessage.getMessage()));
         }
+        _returnMessages.clear();
     }
 
-    private class Store implements TxnOp
-    {
-        //just use this to do a store of the message during the
-        //prepare phase. Any enqueueing etc is done by TxnOps enlisted
-        //by the queues themselves.
-        private final AMQMessage _msg;
-
-        Store(AMQMessage msg)
-        {
-            _msg = msg;
-        }
-
-        public void prepare() throws AMQException
-        {
-            _msg.storeMessage();
-            //the routers reference can now be released
-            _msg.decrementReference();
-        }
-
-        public void undoPrepare()
-        {
-        }
-
-        public void commit()
-        {
-        }
-
-        public void rollback()
-        {
-        }
-    }
 
-    private class Cleanup implements TxnOp
+    public boolean wouldSuspend(AMQMessage msg)
     {
-        private final AMQMessage _msg;
-
-        Cleanup(AMQMessage msg)
-        {
-            _msg = msg;
-        }
-
-        public void prepare() throws AMQException
+        if (isSuspended())
         {
+            return true;
         }
-
-        public void undoPrepare()
-        {
-            //don't need to do anything here, if the store's txn failed
-            //when processing prepare then the message was not stored
-            //or enqueued on any queues and can be discarded
-        }
-
-        public void commit()
+        else
         {
-            //The routers reference can now be released.  This is done
-            //here to ensure that it happens after the queues that
-            //enqueue it have incremented their counts (which as a
-            //memory only operation is done in the commit phase).
-            try
-            {
-                _msg.decrementReference();
-            }
-            catch (AMQException e)
-            {
-                _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
-            }
-            try
+            boolean willSuspend = ((_prefetch_HighWaterMark != 0) &&  _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
+            if(!willSuspend)
             {
-                _msg.checkDeliveredToConsumer();
+                final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
+
+                willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < msg.getSize() + unackedSize);
             }
-            catch (NoConsumersException e)
+
+
+            if(willSuspend)
             {
-                //TODO: store this for delivery after the commit-ok
-                throw new Error("XXX");
-                //_returns.add(e.getReturnMessage(_channelId));
+                setSuspended(true);
             }
+            return willSuspend;
         }
 
-        public void rollback()
-        {
-        }
     }
-
-    private static class Reference {
-
-        public List<AMQMessage> messages = new LinkedList();
-        public List<ByteBuffer> contents = new LinkedList();
-
-    }
-
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/Main.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/Main.java Wed Feb 14 12:02:03 2007
@@ -62,7 +62,7 @@
  * Main entry point for AMQPD.
  *
  */
-public class Main implements ProtocolVersionList, Managable
+public class Main implements ProtocolVersionList
 {
     private static final Logger _logger = Logger.getLogger(Main.class);
 
@@ -70,7 +70,8 @@
 
     private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
 
-    private AMQBrokerManagerMBean _mbean = null;
+    
+    private static Main _instance;
 
     protected static class InitException extends Exception
     {
@@ -265,7 +266,6 @@
         }
         bind(port, connectorConfig);
 
-        createAndRegisterBrokerMBean();
     }
 
     protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException
@@ -324,7 +324,7 @@
             // implementation provided by MINA
             if (connectorConfig.enableExecutorPool)
             {
-                sconfig.setThreadModel(new ReadWriteThreadModel());
+                sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
             }
 
             if (connectorConfig.enableNonSSL)
@@ -368,7 +368,7 @@
     public static void main(String[] args)
     {
 
-        new Main(args);
+        _instance = new Main(args);
     }
 
     private byte[] parseIP(String address) throws Exception
@@ -430,21 +430,4 @@
         }
     }
 
-    private void createAndRegisterBrokerMBean() throws AMQException
-    {
-        try
-        {
-            _mbean = new AMQBrokerManagerMBean();
-            _mbean.register();
-        }
-        catch (JMException ex)
-        {
-            throw new AMQException("Exception occured in creating AMQBrokerManager MBean");    
-        }
-    }
-
-    public ManagedObject getManagedObject()
-    {
-        return _mbean;
-    }    
 }