You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC
svn commit: r507672 [2/16] - in /incubator/qpid/branches/qpid.0-9:
gentools/src/org/apache/qpid/gentools/ gentools/templ.java/
gentools/templ.net/ java/ java/broker/ java/broker/bin/
java/broker/distribution/ java/broker/distribution/src/ java/broker/d...
Modified: incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java Wed Feb 14 12:02:03 2007
@@ -61,7 +61,9 @@
static private Method mbGetGenerateMethod;
static private Method mbMangledGetGenerateMethod;
static private Method mbParamListGenerateMethod;
+ static private Method mbPassedParamListGenerateMethod;
static private Method mbMangledParamListGenerateMethod;
+ static private Method mbMangledPassedParamListGenerateMethod;
static private Method mbBodyInitGenerateMethod;
static private Method mbMangledBodyInitGenerateMethod;
static private Method mbSizeGenerateMethod;
@@ -131,11 +133,22 @@
AmqpVersionSet.class, int.class, int.class, boolean.class); }
catch (NoSuchMethodException e) { e.printStackTrace(); }
+
+ try { mbPassedParamListGenerateMethod = JavaGenerator.class.getDeclaredMethod(
+ "generateMbPassedParamList", String.class, AmqpField.class,
+ AmqpVersionSet.class, int.class, int.class, boolean.class); }
+ catch (NoSuchMethodException e) { e.printStackTrace(); }
+
try { mbMangledParamListGenerateMethod = JavaGenerator.class.getDeclaredMethod(
"generateMbMangledParamList", AmqpField.class,
int.class, int.class, boolean.class); }
catch (NoSuchMethodException e) { e.printStackTrace(); }
+ try { mbMangledPassedParamListGenerateMethod = JavaGenerator.class.getDeclaredMethod(
+ "generateMbMangledPassedParamList", AmqpField.class,
+ int.class, int.class, boolean.class); }
+ catch (NoSuchMethodException e) { e.printStackTrace(); }
+
try { mbBodyInitGenerateMethod = JavaGenerator.class.getDeclaredMethod(
"generateMbBodyInit", String.class, AmqpField.class,
AmqpVersionSet.class, int.class, int.class, boolean.class); }
@@ -320,10 +333,10 @@
"EncodingUtils.writeUnsignedShort(buffer, #)", // encode expression
"# = buffer.getUnsignedShort()")); // decode expression
typeMap.put("shortstr", new DomainInfo(
- "String", // Java code type
+ "AMQShortString", // Java code type
"EncodingUtils.encodedShortStringLength(#)", // size
"EncodingUtils.writeShortStringBytes(buffer, #)", // encode expression
- "# = EncodingUtils.readShortString(buffer)")); // decode expression
+ "# = EncodingUtils.readAMQShortString(buffer)")); // decode expression
typeMap.put("table", new DomainInfo(
"FieldTable", // Java code type
"EncodingUtils.encodedFieldTableLength(#)", // size
@@ -457,11 +470,11 @@
if (token.compareTo("${CLASS}") == 0 && thisClass != null)
return thisClass.name;
if (token.compareTo("${CLASS_ID_INIT}") == 0 && thisClass != null)
- return generateIndexInitializer("classIdMap", thisClass.indexMap, 8);
+ return generateIndexInitializer("registerClassId", thisClass.indexMap, 8);
if (token.compareTo("${METHOD}") == 0 && method != null)
return method.name;
if (token.compareTo("${METHOD_ID_INIT}") == 0 && method != null)
- return generateIndexInitializer("methodIdMap", method.indexMap, 8);
+ return generateIndexInitializer("registerMethodId", method.indexMap, 8);
if (token.compareTo("${FIELD}") == 0 && field != null)
return field.name;
@@ -573,6 +586,16 @@
codeSnippet += fieldMap.parseFieldMap(mbParamListGenerateMethod,
mbMangledParamListGenerateMethod, 42, 4, this);
}
+
+ else if (token.compareTo("${mb_field_passed_parameter_list}") == 0)
+ {
+ // <cringe> The code generated by this is ugly... It puts a comma on a line by itself!
+ // TODO: Find a more elegant solution here sometime...
+ codeSnippet = fieldMap.size() > 0 ? Utils.createSpaces(42) + "," + cr : "";
+ // </cringe>
+ codeSnippet += fieldMap.parseFieldMap(mbPassedParamListGenerateMethod,
+ mbMangledPassedParamListGenerateMethod, 42, 4, this);
+ }
else if (token.compareTo("${mb_field_body_initialize}") == 0)
{
codeSnippet = fieldMap.parseFieldMap(mbBodyInitGenerateMethod,
@@ -712,16 +735,12 @@
String indent = Utils.createSpaces(indentSize);
StringBuffer sb = new StringBuffer();
- Iterator<Integer> iItr = indexMap.keySet().iterator();
- while (iItr.hasNext())
+ for (Integer index : indexMap.keySet())
{
- int index = iItr.next();
AmqpVersionSet versionSet = indexMap.get(index);
- Iterator<AmqpVersion> vItr = versionSet.iterator();
- while (vItr.hasNext())
+ for (AmqpVersion version : versionSet)
{
- AmqpVersion version = vItr.next();
- sb.append(indent + mapName + ".put(\"" + version.toString() + "\", " + index + ");" + cr);
+ sb.append(indent + mapName + "( (byte) " + version.getMajor() +", (byte) " + version.getMinor() + ", " + index + ");" + cr);
}
}
return sb.toString();
@@ -746,12 +765,12 @@
{
int classIndex = findIndex(thisClass.indexMap, version);
int methodIndex = findIndex(method.indexMap, version);
- sb.append(indent + "classIDMethodIDVersionBodyMap.put(" + cr);
- sb.append(indent + tab + "createMapKey((short)" + classIndex +
- ", (short)" + methodIndex + ", (byte)" + version.getMajor() +
- ", (byte)" + version.getMinor() + "), " + cr);
+ sb.append(indent + "registerMethod(" + cr);
+ sb.append(indent + tab + "(short)" + classIndex +
+ ", (short)" + methodIndex + ", (byte)" + version.getMajor() +
+ ", (byte)" + version.getMinor() + ", " + cr);
sb.append(indent + tab + Utils.firstUpper(thisClass.name) +
- Utils.firstUpper(method.name) + "Body.class);" + cr);
+ Utils.firstUpper(method.name) + "Body.getFactory());" + cr);
}
catch (Exception e) {} // Ignore
}
@@ -924,6 +943,15 @@
(nextFlag ? "," : "") + " // AMQP version(s): " + versionSet + cr;
}
+
+ protected String generateMbPassedParamList(String codeType, AmqpField field,
+ AmqpVersionSet versionSet, int indentSize, int tabSize, boolean nextFlag)
+ {
+ return Utils.createSpaces(indentSize) + field.name +
+ (nextFlag ? "," : "") + " // AMQP version(s): " + versionSet + cr;
+ }
+
+
protected String generateMbMangledParamList(AmqpField field, int indentSize,
int tabSize, boolean nextFlag)
throws AmqpTypeMappingException
@@ -943,10 +971,29 @@
return sb.toString();
}
+ protected String generateMbMangledPassedParamList(AmqpField field, int indentSize,
+ int tabSize, boolean nextFlag)
+ throws AmqpTypeMappingException
+ {
+ StringBuffer sb = new StringBuffer();
+ Iterator<String> dItr = field.domainMap.keySet().iterator();
+ int domainCntr = 0;
+ while (dItr.hasNext())
+ {
+ String domainName = dItr.next();
+ AmqpVersionSet versionSet = field.domainMap.get(domainName);
+ sb.append(Utils.createSpaces(indentSize) + field.name + "_" +
+ (domainCntr++) + (nextFlag ? "," : "") + " // AMQP version(s): " +
+ versionSet + cr);
+ }
+ return sb.toString();
+ }
+
+
protected String generateMbBodyInit(String codeType, AmqpField field,
AmqpVersionSet versionSet, int indentSize, int tabSize, boolean nextFlag)
{
- return Utils.createSpaces(indentSize) + "bodyFrame." + field.name + " = " + field.name +
+ return Utils.createSpaces(indentSize) + "this." + field.name + " = " + field.name +
";" + cr;
}
@@ -1004,23 +1051,26 @@
int ordinal, int indentSize, int tabSize)
{
String indent = Utils.createSpaces(indentSize);
- String bitArrayName = "bitArray_" + ordinal;
- StringBuffer sb = new StringBuffer(indent + "boolean[] " + bitArrayName +
- " = new boolean[] { ");
- for (int i=0; i<bitFieldList.size(); i++)
+
+ StringBuilder sb = new StringBuilder();
+ int i = 0;
+ while(i <bitFieldList.size())
{
- if (i != 0)
+
+ StringBuilder line = new StringBuilder();
+
+ for (int j=0; i<bitFieldList.size() && j<8; i++, j++)
{
- if ((i + 3) % 6 == 0)
- sb.append("," + cr + indent + Utils.createSpaces(tabSize));
- else
- sb.append(", ");
+ if (j != 0)
+ {
+ line.append(", ");
+ }
+ line.append(bitFieldList.get(i));
}
- sb.append(bitFieldList.get(i));
+
+ sb.append(indent +
+ typeMap.get("bit").encodeExpression.replaceAll("#", line.toString()) + ";" + cr);
}
- sb.append(" };" + cr);
- sb.append(Utils.createSpaces(indentSize) +
- typeMap.get("bit").encodeExpression.replaceAll("#", bitArrayName) + ";" + cr);
return sb.toString();
}
@@ -1038,14 +1088,25 @@
int ordinal, int indentSize, int tabSize)
{
String indent = Utils.createSpaces(indentSize);
- String bitArrayName = "bitArray_" + ordinal;
- StringBuffer sb = new StringBuffer();
- sb.append(indent +
- typeMap.get("bit").decodeExpression.replaceAll("#", "boolean[] " + bitArrayName) +
- ";" + cr);
- for (int i=0; i<bitFieldList.size(); i++)
+
+ StringBuilder sb = new StringBuilder(indent);
+ // Multiple occurrences of byte value blocks will result in multiple declarations
+ // unless each is named uniquely.
+ String varName = "packedValue_" + ordinal;
+ sb.append("byte " + varName + ";");
+ sb.append(cr);
+
+ // RG HERE!
+
+ int i = 0;
+ while(i < bitFieldList.size())
{
- sb.append(indent + bitFieldList.get(i) + " = " + bitArrayName + "[" + i + "];" + cr);
+ sb.append(indent + varName + " = EncodingUtils.readByte(buffer);" + cr);
+
+ for(int j = 0; i < bitFieldList.size() && j < 8; i++, j++)
+ {
+ sb.append(indent + bitFieldList.get(i) + " = ( " + varName + " & (byte) (1 << " + j + ") ) != 0;" + cr);
+ }
}
return sb.toString();
}
Modified: incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/Main.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/Main.java (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/Main.java Wed Feb 14 12:02:03 2007
@@ -38,8 +38,11 @@
{
private static final String defaultOutDir = ".." + Utils.fileSeparator + "gen";
private static final String defaultCppTemplateDir = ".." + Utils.fileSeparator + "templ.cpp";
+ private static final String defaultDotnetTemplateDir = ".." + Utils.fileSeparator + "templ.net";
private static final String defaultJavaTemplateDir = ".." + Utils.fileSeparator + "templ.java";
+ private enum GeneratorLangEnum { CPP, DOTNET, JAVA }
+
private DocumentBuilder docBuilder;
private AmqpVersionSet versionSet;
private Generator generator;
@@ -49,7 +52,7 @@
private String outDir;
private String tmplDir;
- private boolean javaFlag;
+ private GeneratorLangEnum generatorLang;
private ArrayList<String> xmlFiles;
private File[] modelTemplateFiles;
private File[] classTemplateFiles;
@@ -81,13 +84,20 @@
// 0. Initialize
outDir = defaultOutDir;
tmplDir = null;
- javaFlag = true;
+ generatorLang = GeneratorLangEnum.CPP; // Default generation language
xmlFiles.clear();
processArgs(args);
- if (javaFlag)
- prepareJava();
- else
+ switch (generatorLang)
+ {
+ case JAVA:
+ prepareJava();
+ break;
+ case DOTNET:
+ prepareDotnet();
+ break;
+ default:
prepareCpp();
+ }
if (modelTemplateFiles.length == 0 && classTemplateFiles.length == 0 &&
methodTemplateFiles.length == 0 && fieldTemplateFiles.length == 0)
@@ -128,11 +138,15 @@
{
case 'c':
case 'C':
- javaFlag = false;
+ generatorLang = GeneratorLangEnum.CPP;
break;
case 'j':
case 'J':
- javaFlag = true;
+ generatorLang = GeneratorLangEnum.JAVA;
+ break;
+ case 'n':
+ case 'N':
+ generatorLang = GeneratorLangEnum.DOTNET;
break;
case 'o':
case 'O':
@@ -182,6 +196,30 @@
};
}
+ private void prepareDotnet()
+ {
+ if (tmplDir == null)
+ tmplDir = defaultDotnetTemplateDir;
+ System.out.println(".NET generation mode.");
+ generator = new DotnetGenerator(versionSet);
+ constants = new AmqpConstantSet(generator);
+ domainMap = new AmqpDomainMap(generator);
+ model = new AmqpModel(generator);
+ // TODO: Add templated that should be handled in here...
+ modelTemplateFiles = new File[]
+ {
+// new File(tmplDir + Utils.fileSeparator + "XXXClass.tmpl"),
+ };
+ classTemplateFiles = new File[]
+ {
+// new File(tmplDir + Utils.fileSeparator + "XXXClass.tmpl"),
+ };
+ methodTemplateFiles = new File[]
+ {
+// new File(tmplDir + Utils.fileSeparator + "XXXClass.tmpl"),
+ };
+ }
+
private void prepareCpp()
{
if (tmplDir == null)
@@ -293,6 +331,7 @@
System.out.println("Usage: Main -c|-j [-o outDir] [-t tmplDir] XMLfile [XMLfile ...]");
System.out.println(" where -c: Generate C++.");
System.out.println(" -j: Generate Java.");
+ System.out.println(" -n: Generate .NET.");
System.out.println(" -o outDir: Use outDir as the output dir (default=\"" + defaultOutDir + "\").");
System.out.println(" -t tmplDir: Find templates in tmplDir.");
System.out.println(" Defaults: \"" + defaultCppTemplateDir + "\" for C++;");
Modified: incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl Wed Feb 14 12:02:03 2007
@@ -28,36 +28,107 @@
package org.apache.qpid.framing;
-import java.util.TreeMap;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Arrays;
import org.apache.mina.common.ByteBuffer;
public class ${CLASS}${METHOD}Body extends AMQMethodBody implements EncodableAMQDataBlock
{
- public static final TreeMap<String, Integer> classIdMap = new TreeMap<String, Integer>();
- public static final TreeMap<String, Integer> methodIdMap = new TreeMap<String, Integer>();
+ private static final AMQMethodBodyInstanceFactory factory = new AMQMethodBodyInstanceFactory()
+ {
+ public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer in, long size) throws AMQFrameDecodingException
+ {
+ return new ${CLASS}${METHOD}Body(major, minor, in);
+ }
+
+ public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer in, long size) throws AMQFrameDecodingException
+ {
+ return new ${CLASS}${METHOD}Body(major, minor, clazzID, methodID, in);
+ }
+
+ };
+
+ public static AMQMethodBodyInstanceFactory getFactory()
+ {
+ return factory;
+ }
+ public static HashMap<Integer, Integer> classIdMap = new HashMap<Integer, Integer>();
+ public static HashMap<Integer, Integer> methodIdMap = new HashMap<Integer, Integer>();
+
+ private static void registerMethodId(byte major, byte minor, int methodId)
+ {
+ methodIdMap.put((0xff & (int) major) | ((0xff & (int) minor)<<8), methodId);
+ }
+
+ private static void registerClassId(byte major, byte minor, int classId)
+ {
+ classIdMap.put((0xff & (int) major) | ((0xff & (int) minor)<<8), classId);
+ }
+
+
static
{
-${CLASS_ID_INIT}
-${METHOD_ID_INIT}
+ ${CLASS_ID_INIT}
+ ${METHOD_ID_INIT}
}
// Fields declared in specification
%{FLIST} ${field_declaration}
+ private final int _clazz;
+ private final int _method;
+
+
// Constructor
- public ${CLASS}${METHOD}Body(byte major, byte minor)
+
+ public ${CLASS}${METHOD}Body(byte major, byte minor, ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ this(major, minor, getClazz(major,minor), getMethod(major,minor), buffer);
+ }
+
+ public ${CLASS}${METHOD}Body(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer) throws AMQFrameDecodingException
{
super(major, minor);
- }
-
- public int getClazz() { return classIdMap.get(major + "-" + minor); }
- public int getMethod() { return methodIdMap.get(major + "-" + minor); }
- public static int getClazz(byte major, byte minor) { return classIdMap.get(major + "-" + minor); }
- public static int getMethod(byte major, byte minor) { return methodIdMap.get(major + "-" + minor); }
-
- // Field methods
+ _clazz = clazzID;
+ _method = methodID;
+%{FLIST} ${mb_field_decode}
+ }
+
+ public ${CLASS}${METHOD}Body(byte major, byte minor, int clazzID, int methodID
+%{FLIST} ${mb_field_parameter_list}
+ )
+ {
+ super(major, minor);
+ _clazz = getClazz(major,minor);
+ _method = getMethod(major,minor);
+%{FLIST} ${mb_field_body_initialize}
+ }
+
+ public int getClazz()
+ {
+ return _clazz;
+ }
+
+ public int getMethod()
+ {
+ return _method;
+ }
+
+ public static int getClazz(byte major, byte minor)
+ {
+ return classIdMap.get((0xff & (int) major) | ((0xff & (int) minor)<<8));
+ }
+
+ public static int getMethod(byte major, byte minor)
+ {
+ return methodIdMap.get((0xff & (int) major) | ((0xff & (int) minor)<<8));
+ }
+
+
+ // Field methods
%{FLIST} ${mb_field_get_method}
public int getBodySize()
@@ -88,17 +159,24 @@
%{FLIST} ${mb_field_parameter_list}
)
{
- ${CLASS}${METHOD}Body bodyFrame = new ${CLASS}${METHOD}Body(major, minor);
-%{FLIST} ${mb_field_body_initialize}
+ return createMethodBody(major, minor, getClazz(major, minor), getMethod(major, minor)
+%{FLIST} ${mb_field_passed_parameter_list}
+ );
+ }
- return bodyFrame;
+ public static ${CLASS}${METHOD}Body createMethodBody(byte major, byte minor, int clazzID, int methodID
+%{FLIST} ${mb_field_parameter_list}
+ )
+ {
+ return new ${CLASS}${METHOD}Body(major, minor, clazzID, methodID
+%{FLIST} ${mb_field_passed_parameter_list}
+ );
}
public ${CLASS}${METHOD}Body copy()
{
- ${CLASS}${METHOD}Body bodyFrame = new ${CLASS}${METHOD}Body(major, minor);
-%{FLIST} ${mb_field_body_initialize}
-
- return bodyFrame;
+ return new ${CLASS}${METHOD}Body(major, minor, getClazz(major, minor), getMethod(major, minor)
+%{FLIST} ${mb_field_passed_parameter_list}
+ );
}
}
Modified: incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodRegistryClass.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodRegistryClass.tmpl?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodRegistryClass.tmpl (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodRegistryClass.tmpl Wed Feb 14 12:02:03 2007
@@ -31,50 +31,101 @@
import java.util.HashMap;
import java.lang.reflect.Constructor;
import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
-class MainRegistry
+public class MainRegistry
{
+ private static final HashMap<Long, AMQMethodBodyInstanceFactory> classIDMethodIDVersionBodyMap = new HashMap<Long, AMQMethodBodyInstanceFactory>();
+
+
private static final Logger _log = Logger.getLogger(MainRegistry.class);
- private static HashMap<Long, Class> classIDMethodIDVersionBodyMap = new HashMap<Long, Class>();
+
+ private static final int DEFAULT_MINOR_VERSION_COUNT = 10;
+ private static final int DEFAULT_MAJOR_VERSION_COUNT = 10;
+
+ private static VersionSpecificRegistry[][] _specificRegistries = new VersionSpecificRegistry[DEFAULT_MAJOR_VERSION_COUNT][];
+
static
{
%{CLIST} ${reg_map_put_method}
}
- public static AMQMethodBody get(short classID, short methodID, byte major, byte minor)
+ public static AMQMethodBody get(short classID, short methodID, byte major, byte minor, ByteBuffer in, long size)
throws AMQFrameDecodingException
{
- Class bodyClass = classIDMethodIDVersionBodyMap.get(
- createMapKey(classID, methodID, major, minor));
- if (bodyClass == null)
- {
- throw new AMQFrameDecodingException(_log,
- "Unable to find a suitable decoder for class " + classID + " and method " +
- methodID + " in AMQP version " + major + "-" + minor + ".");
- }
- try
- {
- Constructor initFn = bodyClass.getConstructor(byte.class, byte.class);
- return (AMQMethodBody) initFn.newInstance(major, minor);
- }
- catch (Exception e)
- {
- throw new AMQFrameDecodingException(_log,
- "Unable to instantiate body class for class " + classID + " and method " +
- methodID + " in AMQP version " + major + "-" + minor + " : " + e, e);
- }
+ VersionSpecificRegistry registry = getVersionSpecificRegistry(major, minor);
+ AMQMethodBodyInstanceFactory bodyFactory = registry.getMethodBody(classID,methodID);
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Unable to find a suitable decoder for class " + classID + " and method " +
+ methodID + " in AMQP version " + major + "-" + minor + ".");
+ }
+ return bodyFactory.newInstance(major, minor, in, size);
+
+
}
+
+ public static VersionSpecificRegistry getVersionSpecificRegistry(byte major, byte minor)
+ {
+ try
+ {
+ return _specificRegistries[(int)major][(int)minor];
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ return null;
+ }
+ catch (NullPointerException e)
+ {
+ return null;
+ }
+
+
+ }
- private static Long createMapKey(short classID, short methodID, byte major, byte minor)
- {
- /**
- * Mapping of 4 components into a guaranteed unique key:
- * MSB LSB
- * +----+----+----+----+----+----+-----+-----+
- * | 0 | classID |methodID |major|minor|
- * +----+----+----+----+----+----+-----+-----+
- */
- return new Long(((long)classID << 32) + ((long)methodID << 16) + ((long)major << 8) + minor);
+ private static VersionSpecificRegistry addVersionSpecificRegistry(byte major, byte minor)
+ {
+ VersionSpecificRegistry[][] registries = _specificRegistries;
+ if(major >= registries.length)
+ {
+ _specificRegistries = new VersionSpecificRegistry[(int)major + 1][];
+ System.arraycopy(registries, 0, _specificRegistries, 0, registries.length);
+ registries = _specificRegistries;
+ }
+ if(registries[major] == null)
+ {
+ registries[major] = new VersionSpecificRegistry[ minor >= DEFAULT_MINOR_VERSION_COUNT ? minor + 1 : DEFAULT_MINOR_VERSION_COUNT ];
+ }
+ else if(registries[major].length <= minor)
+ {
+ VersionSpecificRegistry[] minorArray = registries[major];
+ registries[major] = new VersionSpecificRegistry[ minor + 1 ];
+ System.arraycopy(minorArray, 0, registries[major], 0, minorArray.length);
+
+ }
+
+ VersionSpecificRegistry newRegistry = new VersionSpecificRegistry(major,minor);
+
+ registries[major][minor] = newRegistry;
+
+ return newRegistry;
+ }
+
+ private static void registerMethod(short classID, short methodID, byte major, byte minor, AMQMethodBodyInstanceFactory instanceFactory )
+ {
+ VersionSpecificRegistry registry = getVersionSpecificRegistry(major,minor);
+ if(registry == null)
+ {
+ registry = addVersionSpecificRegistry(major,minor);
+
+ }
+
+ registry.registerMethod(classID, methodID, instanceFactory);
+
}
+
+
}
Propchange: incubator/qpid/branches/qpid.0-9/java/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Feb 14 12:02:03 2007
@@ -2,3 +2,6 @@
blaze.iws
build
intellijclasses
+qpid.iml
+qpid.ipr
+qpid.iws
Propchange: incubator/qpid/branches/qpid.0-9/java/broker/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Feb 14 12:02:03 2007
@@ -2,3 +2,5 @@
intellijclasses
log
target
+qpid-broker.ipr
+qpid-broker.iws
Modified: incubator/qpid/branches/qpid.0-9/java/broker/bin/qpid-server.bat
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/bin/qpid-server.bat?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/bin/qpid-server.bat (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/bin/qpid-server.bat Wed Feb 14 12:02:03 2007
@@ -17,7 +17,7 @@
@REM under the License.
@REM
-@echo off
+echo off
REM Script to run the Qpid Java Broker
rem Guess QPID_HOME if not defined
@@ -60,9 +60,11 @@
shift
goto loop
+rem QPID_OPTS intended to hold any -D props for use
+rem user must enclose any value for QPID_OPTS in double quotes
:runCommand
set LAUNCH_JAR=%QPID_HOME%\lib\qpid-incubating.jar
set MODULE_JARS=%QPID_MODULE_JARS%
-"%JAVA_HOME%\bin\java" -server -Xmx1024m -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
+"%JAVA_HOME%\bin\java" -server -Xmx1024m %QPID_OPTS% -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
:end
Propchange: incubator/qpid/branches/qpid.0-9/java/broker/distribution/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Feb 14 12:02:03 2007
@@ -0,0 +1 @@
+target
Modified: incubator/qpid/branches/qpid.0-9/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/etc/config.xml?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/etc/config.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/etc/config.xml Wed Feb 14 12:02:03 2007
@@ -79,6 +79,37 @@
</mechanisms>
</sasl>
</security>
+ <virtualhosts>
+ <virtualhost>
+ <name>localhost</name>
+ <localhost>
+ <store>
+ <!-- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> -->
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <environment-path>localhost-store</environment-path>
+ </store>
+ </localhost>
+ </virtualhost>
+
+ <virtualhost>
+ <name>development</name>
+ <development>
+ <store>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ </store>
+ </development>
+ </virtualhost>
+
+ <virtualhost>
+ <name>test</name>
+ <test>
+ <store>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ </store>
+ </test>
+ </virtualhost>
+
+ </virtualhosts>
<heartbeat>
<delay>0</delay>
<timeoutFactor>2.0</timeoutFactor>
@@ -86,8 +117,6 @@
<queue>
<auto_register>true</auto_register>
</queue>
- <store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
- </store>
+
<virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
</broker>
Modified: incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml Wed Feb 14 12:02:03 2007
@@ -8,9 +8,9 @@
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- -
+ -
- http://www.apache.org/licenses/LICENSE-2.0
- -
+ -
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,14 +36,27 @@
<param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
</layout>
</appender>
+
+ <!--
+ After merge with trunk, there seems to be considerable confusion over
+ this section, so all have been commented out for now...
+ -->
- <category name="org.apache.qpid.framing">
+ <!--<category name="org.apache.qpid.framing">
<priority value="debug"/>
</category>
- <category name="org.apache.qpid.server.handler">
+ <category name="org.apache.qpid.server.queue">
<priority value="debug"/>
</category>
+
+ <category name="org.apache.qpid.server.txn">
+ <priority value="debug"/>
+ </category>
+
+ <category name="org.apache.qpid.server.handler">
+ <priority value="debug"/>
+ </category>-->
<root>
<priority value="debug"/>
Modified: incubator/qpid/branches/qpid.0-9/java/broker/etc/virtualhosts.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/etc/virtualhosts.xml?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/etc/virtualhosts.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/etc/virtualhosts.xml Wed Feb 14 12:02:03 2007
@@ -21,7 +21,17 @@
-->
<virtualhosts>
<virtualhost>
- <path>/development</path>
+ <path>localhost</path>
+ <bind>direct://amq.direct//queue</bind>
+ <bind>direct://amq.direct//ping</bind>
+ </virtualhost>
+ <virtualhost>
+ <path>development</path>
+ <bind>direct://amq.direct//queue</bind>
+ <bind>direct://amq.direct//ping</bind>
+ </virtualhost>
+ <virtualhost>
+ <path>test</path>
<bind>direct://amq.direct//queue</bind>
<bind>direct://amq.direct//ping</bind>
</virtualhost>
Modified: incubator/qpid/branches/qpid.0-9/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/pom.xml?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/pom.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/pom.xml Wed Feb 14 12:02:03 2007
@@ -37,53 +37,44 @@
</properties>
<dependencies>
+
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-common</artifactId>
</dependency>
+
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
+
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
+
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_1.1_spec</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.mina</groupId>
- <artifactId>mina-filter-ssl</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.mina</groupId>
- <artifactId>mina-java5</artifactId>
- </dependency>
- <dependency>
- <groupId>backport-util-concurrent</groupId>
- <artifactId>backport-util-concurrent</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </dependency>
+
+ <!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymockclassextension</artifactId>
+ <scope>test</scope>
</dependency>
+
</dependencies>
<build>
+
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -128,7 +119,32 @@
</systemProperties>
</configuration>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
+
+ <testResources>
+ <testResource>
+ <targetPath>src/</targetPath>
+ <filtering>false</filtering>
+ <directory>src/test/java</directory>
+ <includes>
+ <include>**/*.java</include>
+ </includes>
+ </testResource>
+ </testResources>
+
</build>
</project>
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/grammar/SelectorParser.jj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/grammar/SelectorParser.jj?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/grammar/SelectorParser.jj (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/grammar/SelectorParser.jj Wed Feb 14 12:02:03 2007
@@ -66,8 +66,9 @@
import java.io.*;
import java.util.*;
-import javax.jms.InvalidSelectorException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.filter.*;
/**
@@ -81,14 +82,14 @@
this(new StringReader(""));
}
- public BooleanExpression parse(String sql) throws InvalidSelectorException {
+ public BooleanExpression parse(String sql) throws AMQInvalidSelectorException {
this.ReInit(new StringReader(sql));
try {
return this.JmsSelector();
}
catch (Throwable e) {
- throw (InvalidSelectorException)new InvalidSelectorException(sql).initCause(e);
+ throw (AMQInvalidSelectorException)new AMQInvalidSelectorException(sql).initCause(e);
}
}
@@ -589,7 +590,7 @@
(
t = <ID>
{
- left = new PropertyExpression(t.image);
+ left = new PropertyExpression(new AMQShortString(t.image));
}
)
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Wed Feb 14 12:02:03 2007
@@ -30,7 +30,9 @@
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -49,20 +51,28 @@
private final ExchangeFactory _exchangeFactory;
private final MessageStore _messageStore;
+ private final VirtualHost.VirtualHostMBean _virtualHostMBean;
+
@MBeanConstructor("Creates the Broker Manager MBean")
- public AMQBrokerManagerMBean() throws JMException
+ public AMQBrokerManagerMBean(VirtualHost.VirtualHostMBean virtualHostMBean) throws JMException
{
super(ManagedBroker.class, ManagedBroker.TYPE);
- IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- _queueRegistry = appRegistry.getQueueRegistry();
- _exchangeRegistry = appRegistry.getExchangeRegistry();
- _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
- _messageStore = ApplicationRegistry.getInstance().getMessageStore();
+
+ _virtualHostMBean = virtualHostMBean;
+ VirtualHost virtualHost = virtualHostMBean.getVirtualHost();
+
+
+
+
+ _queueRegistry = virtualHost.getQueueRegistry();
+ _exchangeRegistry = virtualHost.getExchangeRegistry();
+ _messageStore = virtualHost.getMessageStore();
+ _exchangeFactory = virtualHost.getExchangeFactory();
}
public String getObjectInstanceName()
{
- return this.getClass().getName();
+ return _virtualHostMBean.getVirtualHost().getName();
}
/**
@@ -81,10 +91,10 @@
{
synchronized (_exchangeRegistry)
{
- Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
+ Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName));
if (exchange == null)
{
- exchange = _exchangeFactory.createExchange(exchangeName, type, durable, autoDelete, 0);
+ exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0);
_exchangeRegistry.registerExchange(exchange);
}
else
@@ -114,7 +124,7 @@
// when there are no bindings.
try
{
- _exchangeRegistry.unregisterExchange(exchangeName, false);
+ _exchangeRegistry.unregisterExchange(new AMQShortString(exchangeName), false);
}
catch (AMQException ex)
{
@@ -135,7 +145,7 @@
public void createNewQueue(String queueName, boolean durable, String owner, boolean autoDelete)
throws JMException
{
- AMQQueue queue = _queueRegistry.getQueue(queueName);
+ AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
if (queue != null)
{
throw new JMException("The queue \"" + queueName + "\" already exists.");
@@ -143,7 +153,7 @@
try
{
- queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry);
+ queue = new AMQQueue(new AMQShortString(queueName), durable, new AMQShortString(owner), autoDelete, getVirtualHost());
if (queue.isDurable() && !queue.isAutoDelete())
{
_messageStore.createQueue(queue);
@@ -156,6 +166,11 @@
}
}
+ private VirtualHost getVirtualHost()
+ {
+ return _virtualHostMBean.getVirtualHost();
+ }
+
/**
* Deletes the queue from queue registry and persistant storage.
*
@@ -164,7 +179,7 @@
*/
public void deleteQueue(String queueName) throws JMException
{
- AMQQueue queue = _queueRegistry.getQueue(queueName);
+ AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
if (queue == null)
{
throw new JMException("The Queue " + queueName + " is not a registerd queue.");
@@ -173,7 +188,7 @@
try
{
queue.delete();
- _messageStore.removeQueue(queueName);
+ _messageStore.removeQueue(new AMQShortString(queueName));
}
catch (AMQException ex)
@@ -182,11 +197,14 @@
}
}
- public ObjectName getObjectName() throws MalformedObjectNameException
+ public ManagedObject getParentObject()
{
- StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
- objectName.append(":type=").append(getType());
+ return _virtualHostMBean;
+ }
- return new ObjectName(objectName.toString());
+ // This will have a single instance for a virtual host, so not having the name property in the ObjectName
+ public ObjectName getObjectName() throws MalformedObjectNameException
+ {
+ return getObjectNameForSingleInstanceMBean();
}
} // End of MBean class
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Feb 14 12:02:03 2007
@@ -27,7 +27,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.Content;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MessageAppendBody;
@@ -43,14 +43,21 @@
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.AMQReference;
+import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.txn.TxnBuffer;
-import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.LocalTransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.mina.common.ByteBuffer;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -59,6 +66,7 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.Set;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -71,7 +79,7 @@
private final int _channelId;
- private boolean _transactional;
+ //private boolean _transactional;
private long _prefetch_HighWaterMark;
@@ -81,6 +89,8 @@
private ResponseManager _responseManager;
private AMQProtocolSession _session;
+ private long _prefetchSize;
+
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
* value of this represents the <b>last</b> tag sent out
@@ -102,30 +112,33 @@
/**
* The set of open references on this channel.
*/
- private Map<String, Reference> _references = new LinkedHashMap();
+ private Map<String, AMQReference> _references = new LinkedHashMap();
/**
* Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
*/
- private final Map<String, AMQQueue> _consumerTag2QueueMap = new TreeMap<String, AMQQueue>();
+ private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>();
private final MessageStore _messageStore;
- private final Object _unacknowledgedMessageMapLock = new Object();
-
- private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
-
- private long _lastDeliveryTag;
+ private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private final MessageRouter _exchanges;
- private final TxnBuffer _txnBuffer;
+ private TransactionalContext _txnContext;
+
+ /**
+ * A context used by the message store enabling it to track context for a given channel even across
+ * thread boundaries
+ */
+ private final StoreContext _storeContext = new StoreContext();
+
+ private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
- private TxAck ackOp;
+ private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
- private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
private Set<Long> _browsedAcks = new HashSet<Long>();
// XXX: clean up arguments
@@ -139,22 +152,29 @@
_exchanges = exchanges;
_requestManager = new RequestManager(_session.getConnectionId(), channelId, _session, true);
_responseManager = new ResponseManager(_session.getConnectionId(), channelId, methodListener, _session, true);
- _txnBuffer = new TxnBuffer(_messageStore);
+ // by default the session is non-transactional
+ _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
- public int getChannelId()
+ /**
+ * Sets this channel to be part of a local transaction
+ */
+ public void setLocalTransactional()
{
- return _channelId;
+ _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
}
public boolean isTransactional()
{
- return _transactional;
+ // this does not look great but there should only be one "non-transactional"
+ // transactional context, while there could be several transactional ones in
+ // theory
+ return !(_txnContext instanceof NonTransactionalContext);
}
- public void setTransactional(boolean transactional)
+ public int getChannelId()
{
- _transactional = transactional;
+ return _channelId;
}
public long getPrefetchCount()
@@ -167,6 +187,17 @@
_prefetch_HighWaterMark = prefetchCount;
}
+ public long getPrefetchSize()
+ {
+ return _prefetchSize;
+ }
+
+
+ public void setPrefetchSize(long prefetchSize)
+ {
+ _prefetchSize = prefetchSize;
+ }
+
public long getPrefetchLowMarkCount()
{
return _prefetch_LowWaterMark;
@@ -193,80 +224,165 @@
AMQMessage message;
switch (body.getContentType()) {
case INLINE_T:
- message = new AMQMessage(_messageStore, transferBody,
- Collections.singletonList(body.getContent()));
+ message = new AMQMessage(_messageStore, transferBody, Collections.singletonList(body.getContent()), _txnContext);
message.setPublisher(publisher);
- route(message);
+ routeCurrentMessage(message);
+ message.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
break;
case REF_T:
- Reference ref = getReference(body.getContentAsByteArray());
- message = new AMQMessage(_messageStore, transferBody, ref.contents);
+ AMQReference ref = getReference(body.getContentAsByteArray());
+ message = new AMQMessage(_messageStore, transferBody, ref.getContentList(), _txnContext);
message.setPublisher(publisher);
- ref.messages.add(message);
+ ref.addRefTransferBody(message);
break;
}
}
- private static String key(byte[] id) {
+ private static String key(byte[] id)
+ {
return new String(id);
}
- private Reference getReference(byte[] id) {
+ private AMQReference getReference(byte[] id)
+ {
String key = key(id);
- Reference ref = _references.get(key);
- if (ref == null) {
+ AMQReference ref = _references.get(key);
+ if (ref == null)
+ {
throw new IllegalArgumentException(key);
}
return ref;
}
- private Reference createReference(byte[] id) {
+ private AMQReference createReference(byte[] id)
+ {
String key = key(id);
- if (_references.containsKey(key)) {
+ if (_references.containsKey(key))
+ {
throw new IllegalArgumentException(key);
}
- Reference ref = new Reference();
+ AMQReference ref = new AMQReference(id);
_references.put(key, ref);
return ref;
}
- private Reference removeReference(byte[] id) {
+ private AMQReference removeReference(byte[] id)
+ {
String key = key(id);
- Reference ref = _references.remove(key);
- if (ref == null) {
+ AMQReference ref = _references.remove(key);
+ if (ref == null)
+ {
throw new IllegalArgumentException(key);
}
return ref;
}
- public void addMessageOpen(MessageOpenBody open) {
+ public void addMessageOpen(MessageOpenBody open)
+ {
createReference(open.reference);
}
- public void addMessageAppend(MessageAppendBody append) {
- Reference ref = getReference(append.reference);
- ref.contents.add(ByteBuffer.wrap(append.bytes));
+ public void addMessageAppend(MessageAppendBody append)
+ {
+ AMQReference ref = getReference(append.reference);
+ ref.appendContent(ByteBuffer.wrap(append.bytes));
}
- public void addMessageClose(MessageCloseBody close) throws AMQException {
- Reference ref = removeReference(close.reference);
- for (AMQMessage msg : ref.messages) {
- route(msg);
+ public void addMessageClose(MessageCloseBody close) throws AMQException
+ {
+ AMQReference ref = removeReference(close.reference);
+ for (AMQMessage msg : ref.getMessageList())
+ {
+ routeCurrentMessage(msg);
+ msg.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
}
}
- public void deliver(AMQMessage msg, String destination, final long deliveryTag) {
- deliver(msg, destination, new AMQMethodListener() {
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException {
+ protected void routeCurrentMessage(AMQMessage msg) throws AMQException
+ {
+ try
+ {
+ _exchanges.routeContent(msg);
+ }
+ catch (NoRouteException e)
+ {
+ _returnMessages.add(e);
+ }
+ }
+//
+// public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag)
+// {
+// deliver(msg, destination, new AMQMethodListener()
+// {
+// public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+// {
+// AMQMethodBody method = evt.getMethod();
+// if (_log.isDebugEnabled())
+// {
+// _log.debug(method + " received on channel " + _channelId);
+// }
+// // XXX: multiple?
+// if (method instanceof MessageOkBody)
+// {
+// acknowledgeMessage(deliveryTag, false);
+// return true;
+// }
+// else
+// {
+// // TODO: implement reject
+// return false;
+// }
+// }
+// public void error(Exception e) {}
+// });
+// }
+
+ public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag)
+ {
+ // Do we need to refactor the content for a different frame size?
+ long maxFrameSize = _session.getFrameMax();
+ Iterable<ByteBuffer> contentItr = msg.getContents();
+ if (msg.getSize() > maxFrameSize)
+ {
+ Iterator<ByteBuffer> cItr = contentItr.iterator();
+ if (cItr.next().limit() > maxFrameSize) // First chunk should equal incoming frame size
+ {
+ // TODO - Refactor the chunks for smaller outbound frame size
+ throw new Error("XXX TODO - need to refactor content chunks here");
+ // deliverRef(msg, destination, deliveryTag);
+ }
+ else
+ {
+ // Use ref content as is - no need to refactor
+ deliverRef(msg, destination, deliveryTag);
+ }
+ }
+ else
+ {
+ // Concatenate - all incoming chunks will fit into single outbound frame
+ deliverInline(msg, destination, deliveryTag);
+ }
+ }
+
+ public void deliverInline(AMQMessage msg, AMQShortString destination, final long deliveryTag)
+ {
+ deliverInline(msg, destination, new AMQMethodListener()
+ {
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+ {
AMQMethodBody method = evt.getMethod();
- if (_log.isDebugEnabled()) {
+ if (_log.isDebugEnabled())
+ {
_log.debug(method + " received on channel " + _channelId);
}
// XXX: multiple?
- if (method instanceof MessageOkBody) {
+ if (method instanceof MessageOkBody)
+ {
acknowledgeMessage(deliveryTag, false);
return true;
- } else {
+ }
+ else
+ {
// TODO: implement reject
return false;
}
@@ -275,70 +391,127 @@
});
}
- public void deliver(AMQMessage msg, String destination, AMQMethodListener listener) {
- // XXX: should reframe if necessary to conform to max frame size
+ public void deliverInline(AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
+ {
MessageTransferBody mtb = msg.getTransferBody().copy();
mtb.destination = destination;
ByteBuffer buf = ByteBuffer.allocate((int)msg.getBodySize());
- for (ByteBuffer bb : msg.getContents()) {
+ for (ByteBuffer bb : msg.getContents())
+ {
buf.put(bb);
}
buf.flip();
mtb.body = new Content(Content.TypeEnum.INLINE_T, buf);
_session.writeRequest(_channelId, mtb, listener);
}
-
- protected void route(AMQMessage msg) throws AMQException
+
+ public void deliverRef(final AMQMessage msg, final AMQShortString destination, final long deliveryTag)
{
- if (_transactional)
+ final byte[] refId = String.valueOf(System.currentTimeMillis()).getBytes();
+ AMQMethodBody openBody = MessageOpenBody.createMethodBody(
+ _session.getProtocolMajorVersion(), // AMQP major version
+ _session.getProtocolMinorVersion(), // AMQP minor version
+ refId);
+ _session.writeRequest(_channelId, openBody, new AMQMethodListener()
{
- //don't create a transaction unless needed
- if (msg.isPersistent())
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException
{
- _txnBuffer.containsPersistentChanges();
- }
-
- //A publication will result in the enlisting of several
- //TxnOps. The first is an op that will store the message.
- //Following that (and ordering is important), an op will
- //be added for every queue onto which the message is
- //enqueued. Finally a cleanup op will be added to decrement
- //the reference associated with the routing.
- Store storeOp = new Store(msg);
- _txnBuffer.enlist(storeOp);
- msg.setTxnBuffer(_txnBuffer);
- try
- {
- _exchanges.routeContent(msg);
- _txnBuffer.enlist(new Cleanup(msg));
- }
- catch (RequiredDeliveryException e)
- {
- //Can only be due to the mandatory flag, as no attempt
- //has yet been made to deliver the message. The
- //message will thus not have been delivered to any
- //queue so we can return the message (without killing
- //the transaction) and for efficiency remove the store
- //operation from the buffer.
- _txnBuffer.cancel(storeOp);
- throw e;
- }
- }
- else
- {
- try
- {
- _exchanges.routeContent(msg);
- //following check implements the functionality
- //required by the 'immediate' flag:
- msg.checkDeliveredToConsumer();
- }
- finally
- {
- msg.decrementReference();
+ AMQMethodBody method = evt.getMethod();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(method + " received on channel " + _channelId);
+ }
+ if (method instanceof MessageOkBody)
+ {
+ acknowledgeMessage(deliveryTag, false);
+ deliverRef(refId, msg, destination, _session.getStateManager());
+ return true;
+ }
+ else
+ {
+ // TODO: implement reject
+ return false;
+ }
}
- }
+ public void error(Exception e) {}
+ });
}
+
+ public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
+ {
+ MessageTransferBody mtb = msg.getTransferBody().copy();
+ mtb.destination = destination;
+ mtb.body = new Content(Content.TypeEnum.REF_T, refId);
+ _session.writeRequest(_channelId, mtb, listener);
+ for (ByteBuffer bb : msg.getContents())
+ {
+ ByteBuffer dup = bb.duplicate();
+ byte[] ba = new byte[dup.limit()];
+ dup.get(ba);
+ AMQMethodBody appendBody = MessageAppendBody.createMethodBody(
+ _session.getProtocolMajorVersion(), // AMQP major version
+ _session.getProtocolMinorVersion(), // AMQP minor version
+ ba,
+ refId);
+ _session.writeRequest(_channelId, appendBody, listener);
+ }
+ AMQMethodBody closeBody = MessageCloseBody.createMethodBody(
+ _session.getProtocolMajorVersion(), // AMQP major version
+ _session.getProtocolMinorVersion(), // AMQP minor version
+ refId);
+ }
+
+// protected void route(AMQMessage msg) throws AMQException
+// {
+// if (isTransactional())
+// {
+// //don't create a transaction unless needed
+// if (msg.isPersistent())
+// {
+// // _txnBuffer.containsPersistentChanges();
+// }
+//
+// //A publication will result in the enlisting of several
+// //TxnOps. The first is an op that will store the message.
+// //Following that (and ordering is important), an op will
+// //be added for every queue onto which the message is
+// //enqueued. Finally a cleanup op will be added to decrement
+// //the reference associated with the routing.
+// // Store storeOp = new Store(msg);
+// // _txnBuffer.enlist(storeOp);
+// // msg.setTxnBuffer(_txnBuffer);
+// try
+// {
+// _exchanges.routeContent(msg);
+// // _txnBuffer.enlist(new Cleanup(msg));
+// }
+// catch (RequiredDeliveryException e)
+// {
+// //Can only be due to the mandatory flag, as no attempt
+// //has yet been made to deliver the message. The
+// //message will thus not have been delivered to any
+// //queue so we can return the message (without killing
+// //the transaction) and for efficiency remove the store
+// //operation from the buffer.
+// // _txnBuffer.cancel(storeOp);
+// throw e;
+// }
+// }
+// else
+// {
+// try
+// {
+// _exchanges.routeContent(msg);
+// //following check implements the functionality
+// //required by the 'immediate' flag:
+// msg.checkDeliveredToConsumer();
+// }
+// finally
+// {
+// msg.decrementReference(_storeContext);
+// }
+// }
+// }
public RequestManager getRequestManager()
{
@@ -369,17 +542,19 @@
* @param queue the queue to subscribe to
* @param session the protocol session of the subscriber
* @param noLocal
+ * @param exclusive
* @return the consumer tag. This is returned to the subscriber and used in
* subsequent unsubscribe requests
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
+ public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue,
+ AMQProtocolSession session, boolean acks, FieldTable filters,
+ boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
- tag = "sgen_" + getNextConsumerTag();
+ tag = new AMQShortString("sgen_" + getNextConsumerTag());
}
if (_consumerTag2QueueMap.containsKey(tag))
{
@@ -392,7 +567,7 @@
}
- public void unsubscribeConsumer(AMQProtocolSession session, String consumerTag) throws AMQException
+ public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
{
AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
if (q != null)
@@ -408,21 +583,16 @@
*/
public void close(AMQProtocolSession session) throws AMQException
{
- if (_transactional)
- {
- synchronized(_txnBuffer)
- {
- _txnBuffer.rollback();//releases messages
- }
- }
+ _txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
+ _txnContext.commit();
}
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
{
_log.info("Unsubscribing all consumers on channel " + toString());
- for (Map.Entry<String, AMQQueue> me : _consumerTag2QueueMap.entrySet())
+ for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet())
{
me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
}
@@ -432,16 +602,16 @@
/**
* Add a message to the channel-based list of unacknowledged messages
*
- * @param message
- * @param deliveryTag
- * @param queue
+ * @param message the message that was delivered
+ * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of
+ * the delivery tag)
+ * @param queue the queue from which the message was delivered
*/
- public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
+ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
{
- synchronized(_unacknowledgedMessageMapLock)
+ synchronized (_unacknowledgedMessageMap.getLock())
{
- _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
- _lastDeliveryTag = deliveryTag;
+ _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
checkSuspension();
}
}
@@ -449,24 +619,19 @@
/**
* Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
* May result in delivery to this same channel or to other subscribers.
+ *
+ * @throws org.apache.qpid.AMQException if the requeue fails
*/
public void requeue() throws AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
- Map<Long, UnacknowledgedMessage> currentList;
- synchronized(_unacknowledgedMessageMapLock)
- {
- currentList = _unacknowledgedMessageMap;
- _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
- }
+ Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
- for (UnacknowledgedMessage unacked : currentList.values())
+ for (UnacknowledgedMessage unacked : messagesToBeDelivered)
{
if (unacked.queue != null)
{
- unacked.message.setTxnBuffer(null);
-
- unacked.queue.deliver(unacked.message);
+ _txnContext.deliver(unacked.message, unacked.queue);
}
}
}
@@ -474,53 +639,87 @@
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*/
- public void resend(AMQProtocolSession session)
- {
- //messages go to this channel
- synchronized(_unacknowledgedMessageMapLock)
- {
- for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet())
- {
- long deliveryTag = entry.getKey();
- String consumerTag = entry.getValue().consumerTag;
- AMQMessage msg = entry.getValue().message;
- msg.setRedelivered(true);
- deliver(msg, consumerTag, deliveryTag);
- }
- }
- }
+ public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
+ {
+ throw new Error("XXX");
+// final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
+//
+// _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+// {
+// public boolean callback(UnacknowledgedMessage message) throws AMQException
+// {
+// long deliveryTag = message.deliveryTag;
+// AMQShortString consumerTag = message.consumerTag;
+// AMQMessage msg = message.message;
+// msg.setRedelivered(true);
+// // working
+// // deliver(msg, consumerTag, deliveryTag);
+// // trunk
+// if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
+// {
+// msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+// }
+// else
+// {
+// // Message has no consumer tag, so was "delivered" to a GET
+// // or consumer no longer registered
+// // cannot resend, so re-queue.
+// if (message.queue != null && (consumerTag == null || requeue))
+// {
+// msgToRequeue.add(message);
+// }
+// }
+// // false means continue processing
+// return false;
+// }
+//
+// public void visitComplete()
+// {
+// }
+// });
+//
+// for(UnacknowledgedMessage message : msgToRequeue)
+// {
+// _txnContext.deliver(message.message, message.queue);
+// _unacknowledgedMessageMap.remove(message.deliveryTag);
+// }
+ }
/**
* Callback indicating that a queue has been deleted. We must update the structure of unacknowledged
* messages to remove the queue reference and also decrement any message reference counts, without
- * actually removing the item sine we may get an ack for a delivery tag that was generated from the
+ * actually removing the item since we may get an ack for a delivery tag that was generated from the
* deleted queue.
*
- * @param queue
+ * @param queue the queue that has been deleted
+ * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(final AMQQueue queue) throws AMQException
{
- synchronized(_unacknowledgedMessageMapLock)
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet())
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- final UnacknowledgedMessage unackedMsg = unacked.getValue();
- // we can compare the reference safely in this case
- if (unackedMsg.queue == queue)
+ if (message.queue == queue)
{
- unackedMsg.queue = null;
try
{
- unackedMsg.message.decrementReference();
+ message.discard(_storeContext);
+ message.queue = null;
}
catch (AMQException e)
{
- _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " +
+ _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " +
e, e);
}
}
+ return false;
}
- }
+
+ public void visitComplete()
+ {
+ }
+ });
}
/**
@@ -533,151 +732,11 @@
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
- if (_transactional)
- {
- //check that the tag exists to give early failure
- if (!multiple || deliveryTag > 0)
- {
- checkAck(deliveryTag);
- }
- //we use a single txn op for all acks and update this op
- //as new acks come in. If this is the first ack in the txn
- //we will need to create and enlist the op.
- if (ackOp == null)
- {
- ackOp = new TxAck(new AckMap());
- _txnBuffer.enlist(ackOp);
- }
- //update the op to include this ack request
- if (multiple && deliveryTag == 0)
- {
- synchronized(_unacknowledgedMessageMapLock)
- {
- //if have signalled to ack all, that refers only
- //to all at this time
- ackOp.update(_lastDeliveryTag, multiple);
- }
- }
- else
- {
- ackOp.update(deliveryTag, multiple);
- }
- }
- else
- {
- handleAcknowledgement(deliveryTag, multiple);
- }
- }
-
- private void checkAck(long deliveryTag) throws AMQException
- {
- synchronized(_unacknowledgedMessageMapLock)
- {
- if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
- {
- throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
- }
- }
- }
-
- private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
- {
- if (_log.isDebugEnabled())
+ synchronized (_unacknowledgedMessageMap.getLock())
{
- _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag +
- " and multiple " + multiple);
- }
- if (multiple)
- {
- LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
- synchronized(_unacknowledgedMessageMapLock)
- {
- if (deliveryTag == 0)
- {
- //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages.
- _log.trace("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size());
- acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values());
- _unacknowledgedMessageMap.clear();
- }
- else
- {
- if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
- {
- throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
- }
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator();
-
- while (i.hasNext())
- {
-
- Map.Entry<Long, UnacknowledgedMessage> unacked = i.next();
-
- if (unacked.getKey() > deliveryTag)
- {
- //This should not occur now.
- throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString());
- }
-
- i.remove();
-
- acked.add(unacked.getValue());
- if (unacked.getKey() == deliveryTag)
- {
- break;
- }
- }
- }
- }// synchronized
-
- if (_log.isTraceEnabled())
- {
- _log.trace("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
- acked.size() + " items.");
- }
-
- for (UnacknowledgedMessage msg : acked)
- {
- if (!_browsedAcks.contains(deliveryTag))
- {
- msg.discard();
- }
- else
- {
- _browsedAcks.remove(deliveryTag);
- }
- }
-
- }
- else
- {
- UnacknowledgedMessage msg;
- synchronized(_unacknowledgedMessageMapLock)
- {
- msg = _unacknowledgedMessageMap.remove(deliveryTag);
- }
-
- if (msg == null)
- {
- _log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
- throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
- }
-
- if (!_browsedAcks.contains(deliveryTag))
- {
- msg.discard();
- }
- else
- {
- _browsedAcks.remove(deliveryTag);
- }
-
- if (_log.isTraceEnabled())
- {
- _log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
- }
+ _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
+ checkSuspension();
}
-
- checkSuspension();
}
/**
@@ -685,19 +744,24 @@
*
* @return the map of unacknowledged messages
*/
- public Map<Long, UnacknowledgedMessage> getUnacknowledgedMessageMap()
+ public UnacknowledgedMessageMap getUnacknowledgedMessageMap()
{
return _unacknowledgedMessageMap;
}
+ public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
+ {
+ _browsedAcks.add(deliveryTag);
+ addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
private void checkSuspension()
{
boolean suspend;
- //noinspection SynchronizeOnNonFinalField
- synchronized(_unacknowledgedMessageMapLock)
- {
- suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
- }
+
+ suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
+ || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+
setSuspended(suspend);
}
@@ -707,11 +771,8 @@
if (isSuspended && !suspended)
{
- synchronized(_unacknowledgedMessageMapLock)
- {
- // Continue being suspended if we are above the _prefetch_LowWaterMark
- suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
- }
+ // Continue being suspended if we are above the _prefetch_LowWaterMark
+ suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
}
boolean wasSuspended = _suspended.getAndSet(suspended);
@@ -740,33 +801,22 @@
public void commit() throws AMQException
{
- if (ackOp != null)
+ if (!isTransactional())
{
- ackOp.consolidate();
- if (ackOp.checkPersistent())
- {
- _txnBuffer.containsPersistentChanges();
- }
- ackOp = null;//already enlisted, after commit will reset regardless of outcome
+ throw new AMQException("Fatal error: commit called on non-transactional channel");
}
-
- _txnBuffer.commit();
- //TODO: may need to return 'immediate' messages at this point
+ _txnContext.commit();
}
public void rollback() throws AMQException
{
- //need to protect rollback and close from each other...
- synchronized(_txnBuffer)
- {
- _txnBuffer.rollback();
- }
+ _txnContext.rollback();
}
public String toString()
{
StringBuilder sb = new StringBuilder(30);
- sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional);
+ sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional());
sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark);
sb.append("/").append(_prefetch_HighWaterMark);
return sb.toString();
@@ -782,129 +832,46 @@
return _defaultQueue;
}
- public void processReturns(AMQProtocolSession session)
- {
- for (AMQDataBlock block : _returns)
- {
- session.writeFrame(block);
- }
- _returns.clear();
- }
-
- public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue)
+ public StoreContext getStoreContext()
{
- _browsedAcks.add(deliveryTag);
- addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ return _storeContext;
}
- //we use this wrapper to ensure we are always using the correct
- //map instance (its not final unfortunately)
- private class AckMap implements UnacknowledgedMessageMap
+ public void processReturns(AMQProtocolSession session) throws AMQException
{
- public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
+ for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
- impl().collect(deliveryTag, multiple, msgs);
- }
-
- public void remove(List<UnacknowledgedMessage> msgs)
- {
- impl().remove(msgs);
- }
-
- private UnacknowledgedMessageMap impl()
- {
- return new UnacknowledgedMessageMapImpl(_unacknowledgedMessageMapLock, _unacknowledgedMessageMap);
+ AMQMessage message = bouncedMessage.getAMQMessage();
+ session.writeResponse(_channelId, message.getMessageId(), message.getTransferBody());
+// message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), new AMQShortString(bouncedMessage.getMessage()));
}
+ _returnMessages.clear();
}
- private class Store implements TxnOp
- {
- //just use this to do a store of the message during the
- //prepare phase. Any enqueueing etc is done by TxnOps enlisted
- //by the queues themselves.
- private final AMQMessage _msg;
-
- Store(AMQMessage msg)
- {
- _msg = msg;
- }
-
- public void prepare() throws AMQException
- {
- _msg.storeMessage();
- //the routers reference can now be released
- _msg.decrementReference();
- }
-
- public void undoPrepare()
- {
- }
-
- public void commit()
- {
- }
-
- public void rollback()
- {
- }
- }
- private class Cleanup implements TxnOp
+ public boolean wouldSuspend(AMQMessage msg)
{
- private final AMQMessage _msg;
-
- Cleanup(AMQMessage msg)
- {
- _msg = msg;
- }
-
- public void prepare() throws AMQException
+ if (isSuspended())
{
+ return true;
}
-
- public void undoPrepare()
- {
- //don't need to do anything here, if the store's txn failed
- //when processing prepare then the message was not stored
- //or enqueued on any queues and can be discarded
- }
-
- public void commit()
+ else
{
- //The routers reference can now be released. This is done
- //here to ensure that it happens after the queues that
- //enqueue it have incremented their counts (which as a
- //memory only operation is done in the commit phase).
- try
- {
- _msg.decrementReference();
- }
- catch (AMQException e)
- {
- _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
- }
- try
+ boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
+ if(!willSuspend)
{
- _msg.checkDeliveredToConsumer();
+ final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
+
+ willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < msg.getSize() + unackedSize);
}
- catch (NoConsumersException e)
+
+
+ if(willSuspend)
{
- //TODO: store this for delivery after the commit-ok
- throw new Error("XXX");
- //_returns.add(e.getReturnMessage(_channelId));
+ setSuspended(true);
}
+ return willSuspend;
}
- public void rollback()
- {
- }
}
-
- private static class Reference {
-
- public List<AMQMessage> messages = new LinkedList();
- public List<ByteBuffer> contents = new LinkedList();
-
- }
-
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/Main.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/Main.java Wed Feb 14 12:02:03 2007
@@ -62,7 +62,7 @@
* Main entry point for AMQPD.
*
*/
-public class Main implements ProtocolVersionList, Managable
+public class Main implements ProtocolVersionList
{
private static final Logger _logger = Logger.getLogger(Main.class);
@@ -70,7 +70,8 @@
private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
- private AMQBrokerManagerMBean _mbean = null;
+
+ private static Main _instance;
protected static class InitException extends Exception
{
@@ -265,7 +266,6 @@
}
bind(port, connectorConfig);
- createAndRegisterBrokerMBean();
}
protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException
@@ -324,7 +324,7 @@
// implementation provided by MINA
if (connectorConfig.enableExecutorPool)
{
- sconfig.setThreadModel(new ReadWriteThreadModel());
+ sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
}
if (connectorConfig.enableNonSSL)
@@ -368,7 +368,7 @@
public static void main(String[] args)
{
- new Main(args);
+ _instance = new Main(args);
}
private byte[] parseIP(String address) throws Exception
@@ -430,21 +430,4 @@
}
}
- private void createAndRegisterBrokerMBean() throws AMQException
- {
- try
- {
- _mbean = new AMQBrokerManagerMBean();
- _mbean.register();
- }
- catch (JMException ex)
- {
- throw new AMQException("Exception occured in creating AMQBrokerManager MBean");
- }
- }
-
- public ManagedObject getManagedObject()
- {
- return _mbean;
- }
}