You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC
svn commit: r563982 [5/32] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java Wed Aug 8 11:56:59 2007
@@ -40,18 +40,19 @@
import java.util.concurrent.ConcurrentHashMap;
/**
- * A factory of the ActiveMQ InitialContext which contains {@link ConnectionFactory}
- * instances as well as a child context called <i>destinations</i> which contain all of the
- * current active destinations, in child context depending on the QoS such as
- * transient or durable and queue or topic.
- *
+ * A factory of the ActiveMQ InitialContext which contains
+ * {@link ConnectionFactory} instances as well as a child context called
+ * <i>destinations</i> which contain all of the current active destinations, in
+ * child context depending on the QoS such as transient or durable and queue or
+ * topic.
+ *
* @version $Revision: 1.2 $
*/
public class ActiveMQInitialContextFactory implements InitialContextFactory {
- private static final String[] defaultConnectionFactoryNames = {
- "ConnectionFactory", "QueueConnectionFactory", "TopicConnectionFactory"
- };
+ private static final String[] defaultConnectionFactoryNames = {"ConnectionFactory",
+ "QueueConnectionFactory",
+ "TopicConnectionFactory"};
private String connectionPrefix = "connection.";
private String queuePrefix = "queue.";
@@ -62,34 +63,29 @@
Map data = new ConcurrentHashMap();
String[] names = getConnectionFactoryNames(environment);
for (int i = 0; i < names.length; i++) {
- ActiveMQConnectionFactory factory =null;
+ ActiveMQConnectionFactory factory = null;
String name = names[i];
- try{
- factory = createConnectionFactory(name, environment);
- }catch(Exception e){
+ try {
+ factory = createConnectionFactory(name, environment);
+ } catch (Exception e) {
throw new NamingException("Invalid broker URL");
}
- /* if( broker==null ) {
- try {
- broker = factory.getEmbeddedBroker();
- }
- catch (JMSException e) {
- log.warn("Failed to get embedded broker", e);
- }
- }
- */
- data.put(name,factory);
+ /*
+ * if( broker==null ) { try { broker = factory.getEmbeddedBroker(); }
+ * catch (JMSException e) { log.warn("Failed to get embedded
+ * broker", e); } }
+ */
+ data.put(name, factory);
}
createQueues(data, environment);
createTopics(data, environment);
/*
- if (broker != null) {
- data.put("destinations", broker.getDestinationContext(environment));
- }
- */
+ * if (broker != null) { data.put("destinations",
+ * broker.getDestinationContext(environment)); }
+ */
data.put("dynamicQueues", new LazyCreateContext() {
private static final long serialVersionUID = 6503881346214855588L;
@@ -109,7 +105,7 @@
}
// Properties
- //-------------------------------------------------------------------------
+ // -------------------------------------------------------------------------
public String getTopicPrefix() {
return topicPrefix;
}
@@ -127,19 +123,20 @@
}
// Implementation methods
- //-------------------------------------------------------------------------
+ // -------------------------------------------------------------------------
protected ReadOnlyContext createContext(Hashtable environment, Map data) {
return new ReadOnlyContext(environment, data);
}
- protected ActiveMQConnectionFactory createConnectionFactory(String name, Hashtable environment) throws URISyntaxException {
+ protected ActiveMQConnectionFactory createConnectionFactory(String name, Hashtable environment)
+ throws URISyntaxException {
Hashtable temp = new Hashtable(environment);
- String prefix = connectionPrefix+name+".";
+ String prefix = connectionPrefix + name + ".";
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
- Map.Entry entry = (Map.Entry) iter.next();
- String key = (String) entry.getKey();
- if( key.startsWith(prefix) ) {
+ Map.Entry entry = (Map.Entry)iter.next();
+ String key = (String)entry.getKey();
+ if (key.startsWith(prefix)) {
// Rename the key...
temp.remove(key);
key = key.substring(prefix.length());
@@ -150,10 +147,11 @@
}
protected String[] getConnectionFactoryNames(Map environment) {
- String factoryNames = (String) environment.get("connectionFactoryNames");
+ String factoryNames = (String)environment.get("connectionFactoryNames");
if (factoryNames != null) {
List list = new ArrayList();
- for (StringTokenizer enumeration = new StringTokenizer(factoryNames, ","); enumeration.hasMoreTokens();) {
+ for (StringTokenizer enumeration = new StringTokenizer(factoryNames, ","); enumeration
+ .hasMoreTokens();) {
list.add(enumeration.nextToken().trim());
}
int size = list.size();
@@ -168,7 +166,7 @@
protected void createQueues(Map data, Hashtable environment) {
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
- Map.Entry entry = (Map.Entry) iter.next();
+ Map.Entry entry = (Map.Entry)iter.next();
String key = entry.getKey().toString();
if (key.startsWith(queuePrefix)) {
String jndiName = key.substring(queuePrefix.length());
@@ -179,7 +177,7 @@
protected void createTopics(Map data, Hashtable environment) {
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
- Map.Entry entry = (Map.Entry) iter.next();
+ Map.Entry entry = (Map.Entry)iter.next();
String key = entry.getKey().toString();
if (key.startsWith(topicPrefix)) {
String jndiName = key.substring(topicPrefix.length());
@@ -201,11 +199,13 @@
protected Topic createTopic(String name) {
return new ActiveMQTopic(name);
}
-
+
/**
- * Factory method to create a new connection factory from the given environment
+ * Factory method to create a new connection factory from the given
+ * environment
*/
- protected ActiveMQConnectionFactory createConnectionFactory(Hashtable environment) throws URISyntaxException {
+ protected ActiveMQConnectionFactory createConnectionFactory(Hashtable environment)
+ throws URISyntaxException {
ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory();
Properties properties = new Properties();
properties.putAll(environment);
@@ -216,10 +216,9 @@
public String getConnectionPrefix() {
return connectionPrefix;
}
-
public void setConnectionPrefix(String connectionPrefix) {
this.connectionPrefix = connectionPrefix;
}
-
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java Wed Aug 8 11:56:59 2007
@@ -19,12 +19,13 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+
/**
* Implementation of a Marshaller for byte arrays
*
* @version $Revision: 1.2 $
*/
-public class BytesMarshaller implements Marshaller{
+public class BytesMarshaller implements Marshaller {
/**
* Write the payload of this entry to the RawContainer
*
@@ -32,8 +33,8 @@
* @param dataOut
* @throws IOException
*/
- public void writePayload(Object object,DataOutput dataOut) throws IOException{
- byte[] data=(byte[]) object;
+ public void writePayload(Object object, DataOutput dataOut) throws IOException {
+ byte[] data = (byte[])object;
dataOut.writeInt(data.length);
dataOut.write(data);
}
@@ -45,9 +46,9 @@
* @return unmarshalled object
* @throws IOException
*/
- public Object readPayload(DataInput dataIn) throws IOException{
- int size=dataIn.readInt();
- byte[] data=new byte[size];
+ public Object readPayload(DataInput dataIn) throws IOException {
+ int size = dataIn.readInt();
+ byte[] data = new byte[size];
dataIn.readFully(data);
return data;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java Wed Aug 8 11:56:59 2007
@@ -20,77 +20,77 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+
/**
* Used by RootContainers
*
* @version $Revision: 1.1.1.1 $
*/
-public class ContainerId implements Externalizable{
- private static final long serialVersionUID=-8883779541021821943L;
+public class ContainerId implements Externalizable {
+ private static final long serialVersionUID = -8883779541021821943L;
private Object key;
private String dataContainerName;
public ContainerId() {
}
-
- public ContainerId(Object key,String dataContainerName) {
- this.key=key;
- this.dataContainerName=dataContainerName;
+
+ public ContainerId(Object key, String dataContainerName) {
+ this.key = key;
+ this.dataContainerName = dataContainerName;
}
-
-
+
/**
* @return Returns the dataContainerPrefix.
*/
- public String getDataContainerName(){
+ public String getDataContainerName() {
return dataContainerName;
}
/**
* @param dataContainerName The dataContainerPrefix to set.
*/
- public void setDataContainerName(String dataContainerName){
- this.dataContainerName=dataContainerName;
+ public void setDataContainerName(String dataContainerName) {
+ this.dataContainerName = dataContainerName;
}
/**
* @return Returns the key.
*/
- public Object getKey(){
+ public Object getKey() {
return key;
}
/**
* @param key The key to set.
*/
- public void setKey(Object key){
- this.key=key;
+ public void setKey(Object key) {
+ this.key = key;
}
-
- public int hashCode(){
+
+ public int hashCode() {
return key.hashCode();
}
-
- public boolean equals(Object obj){
+
+ public boolean equals(Object obj) {
boolean result = false;
- if (obj != null && obj instanceof ContainerId){
- ContainerId other = (ContainerId) obj;
+ if (obj != null && obj instanceof ContainerId) {
+ ContainerId other = (ContainerId)obj;
result = other.key.equals(this.key);
}
return result;
}
- public void writeExternal(ObjectOutput out) throws IOException{
+ public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(getDataContainerName());
out.writeObject(key);
}
- public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
- dataContainerName=in.readUTF();
- key=in.readObject();
- }
-
- public String toString(){
- return "CID{"+dataContainerName + ":" + key + "}";
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ dataContainerName = in.readUTF();
+ key = in.readObject();
+ }
+
+ public String toString() {
+ return "CID{" + dataContainerName + ":" + key + "}";
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java Wed Aug 8 11:56:59 2007
@@ -18,16 +18,17 @@
import java.util.NoSuchElementException;
/**
- * Represents a container of persistent objects in the store Acts as a map, but values can be retrieved in insertion
- * order
+ * Represents a container of persistent objects in the store Acts as a map, but
+ * values can be retrieved in insertion order
*
* @version $Revision: 1.2 $
*/
-public interface ListContainer<V> extends List<V>{
+public interface ListContainer<V> extends List<V> {
/**
- * The container is created or retrieved in an unloaded state. load populates the container will all the indexes
- * used etc and should be called before any operations on the container
+ * The container is created or retrieved in an unloaded state. load
+ * populates the container will all the indexes used etc and should be
+ * called before any operations on the container
*/
public void load();
@@ -43,7 +44,8 @@
public boolean isLoaded();
/**
- * For homogenous containers can set a custom marshaller for loading values The default uses Object serialization
+ * For homogenous containers can set a custom marshaller for loading values
+ * The default uses Object serialization
*
* @param marshaller
*/
@@ -67,8 +69,8 @@
public void addFirst(V o);
/**
- * Appends the given element to the end of this list. (Identical in function to the <tt>add</tt> method; included
- * only for consistency.)
+ * Appends the given element to the end of this list. (Identical in function
+ * to the <tt>add</tt> method; included only for consistency.)
*
* @param o the element to be inserted at the end of this list.
*/
@@ -91,7 +93,8 @@
public V removeLast();
/**
- * remove an objecr from the list without retrieving the old value from the store
+ * remove an objecr from the list without retrieving the old value from the
+ * store
*
* @param position
* @return true if successful
@@ -107,7 +110,8 @@
public StoreEntry placeLast(V object);
/**
- * insert an Object in first position int the list but get a StoreEntry of its position
+ * insert an Object in first position int the list but get a StoreEntry of
+ * its position
*
* @param object
* @return the location in the Store
@@ -115,12 +119,13 @@
public StoreEntry placeFirst(V object);
/**
- * Advanced feature = must ensure the object written doesn't overwrite other objects in the container
+ * Advanced feature = must ensure the object written doesn't overwrite other
+ * objects in the container
*
* @param entry
* @param object
*/
- public void update(StoreEntry entry,V object);
+ public void update(StoreEntry entry, V object);
/**
* Retrieve an Object from the Store by its location
@@ -167,10 +172,11 @@
* @return true if successful
*/
public boolean remove(StoreEntry entry);
-
+
/**
- * It's possible that a StoreEntry could be come stale
- * this will return an upto date entry for the StoreEntry position
+ * It's possible that a StoreEntry could be come stale this will return an
+ * upto date entry for the StoreEntry position
+ *
* @param entry old entry
* @return a refreshed StoreEntry
*/
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java Wed Aug 8 11:56:59 2007
@@ -21,47 +21,48 @@
import java.util.Set;
/**
- *Represents a container of persistent objects in the store
- *Acts as a map, but values can be retrieved in insertion order
+ * Represents a container of persistent objects in the store Acts as a map, but
+ * values can be retrieved in insertion order
*
* @version $Revision: 1.2 $
*/
-public interface MapContainer<K, V> extends Map<K, V>{
-
-
- /**
- * The container is created or retrieved in
- * an unloaded state.
- * load populates the container will all the indexes used etc
- * and should be called before any operations on the container
+public interface MapContainer<K, V> extends Map<K, V> {
+
+ /**
+ * The container is created or retrieved in an unloaded state. load
+ * populates the container will all the indexes used etc and should be
+ * called before any operations on the container
*/
public void load();
-
+
/**
* unload indexes from the container
- *
+ *
*/
public void unload();
-
+
/**
* @return true if the indexes are loaded
*/
public boolean isLoaded();
-
+
/**
* For homogenous containers can set a custom marshaller for loading keys
* The default uses Object serialization
+ *
* @param keyMarshaller
*/
public void setKeyMarshaller(Marshaller<K> keyMarshaller);
-
+
/**
* For homogenous containers can set a custom marshaller for loading values
* The default uses Object serialization
- * @param valueMarshaller
-
+ *
+ * @param valueMarshaller
+ *
*/
public void setValueMarshaller(Marshaller<V> valueMarshaller);
+
/**
* @return the id the MapContainer was create with
*/
@@ -78,30 +79,31 @@
public boolean isEmpty();
/**
- * @param key
+ * @param key
* @return true if the container contains the key
*/
public boolean containsKey(K key);
/**
* Get the value associated with the key
- * @param key
+ *
+ * @param key
* @return the value associated with the key from the store
*/
public V get(K key);
-
/**
- * @param o
+ * @param o
* @return true if the MapContainer contains the value o
*/
public boolean containsValue(K o);
/**
* Add add entries in the supplied Map
+ *
* @param map
*/
- public void putAll(Map<K,V> map);
+ public void putAll(Map<K, V> map);
/**
* @return a Set of all the keys
@@ -109,30 +111,30 @@
public Set<K> keySet();
/**
- * @return a collection of all the values - the values will be lazily pulled out of the
- * store if iterated etc.
+ * @return a collection of all the values - the values will be lazily pulled
+ * out of the store if iterated etc.
*/
public Collection<V> values();
/**
- * @return a Set of all the Map.Entry instances - the values will be lazily pulled out of the
- * store if iterated etc.
+ * @return a Set of all the Map.Entry instances - the values will be lazily
+ * pulled out of the store if iterated etc.
*/
- public Set<Map.Entry<K,V>> entrySet();
+ public Set<Map.Entry<K, V>> entrySet();
-
/**
* Add an entry
+ *
* @param key
* @param value
* @return the old value for the key
*/
- public V put(K key,V value);
-
+ public V put(K key, V value);
/**
* remove an entry associated with the key
- * @param key
+ *
+ * @param key
* @return the old value assocaited with the key or null
*/
public V remove(K key);
@@ -141,77 +143,83 @@
* empty the container
*/
public void clear();
-
+
/**
* Add an entry to the Store Map
+ *
* @param key
* @param Value
* @return the StoreEntry associated with the entry
*/
public StoreEntry place(K key, V Value);
-
+
/**
* Remove an Entry from ther Map
+ *
* @param entry
*/
public void remove(StoreEntry entry);
-
+
/**
* Get the Key object from it's location
+ *
* @param keyLocation
* @return the key for the entry
*/
public K getKey(StoreEntry keyLocation);
-
+
/**
* Get the value from it's location
+ *
* @param Valuelocation
* @return the Object
*/
public V getValue(StoreEntry Valuelocation);
-
- /** Get the StoreEntry for the first value in the Map
- *
- * @return the first StoreEntry or null if the map is empty
- */
- public StoreEntry getFirst();
-
- /**
- * Get the StoreEntry for the last value item of the Map
- *
- * @return the last StoreEntry or null if the list is empty
- */
- public StoreEntry getLast();
-
- /**
- * Get the next StoreEntry value from the map
- *
- * @param entry
- * @return the next StoreEntry or null
- */
- public StoreEntry getNext(StoreEntry entry);
-
- /**
- * Get the previous StoreEntry from the map
- *
- * @param entry
- * @return the previous store entry or null
- */
- public StoreEntry getPrevious(StoreEntry entry);
-
-
- /**
- * It's possible that a StoreEntry could be come stale
- * this will return an upto date entry for the StoreEntry position
- * @param entry old entry
- * @return a refreshed StoreEntry
- */
- public StoreEntry refresh(StoreEntry entry);
-
- /**
- * Get the StoreEntry associated with the key
- * @param key
- * @return the StoreEntry
- */
- public StoreEntry getEntry(K key);
+
+ /**
+ * Get the StoreEntry for the first value in the Map
+ *
+ * @return the first StoreEntry or null if the map is empty
+ */
+ public StoreEntry getFirst();
+
+ /**
+ * Get the StoreEntry for the last value item of the Map
+ *
+ * @return the last StoreEntry or null if the list is empty
+ */
+ public StoreEntry getLast();
+
+ /**
+ * Get the next StoreEntry value from the map
+ *
+ * @param entry
+ * @return the next StoreEntry or null
+ */
+ public StoreEntry getNext(StoreEntry entry);
+
+ /**
+ * Get the previous StoreEntry from the map
+ *
+ * @param entry
+ * @return the previous store entry or null
+ */
+ public StoreEntry getPrevious(StoreEntry entry);
+
+ /**
+ * It's possible that a StoreEntry could be come stale this will return an
+ * upto date entry for the StoreEntry position
+ *
+ * @param entry old entry
+ * @return a refreshed StoreEntry
+ */
+ public StoreEntry refresh(StoreEntry entry);
+
+ /**
+ * Get the StoreEntry associated with the key
+ *
+ * @param key
+ * @return the StoreEntry
+ */
+ public StoreEntry getEntry(K key);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageIdMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageIdMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageIdMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageIdMarshaller.java Wed Aug 8 11:56:59 2007
@@ -20,6 +20,7 @@
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.command.MessageId;
+
/**
* Implementation of a Marshaller for MessageIds
*
@@ -33,7 +34,7 @@
* @param dataOut
* @throws IOException
*/
- public void writePayload(MessageId object,DataOutput dataOut) throws IOException{
+ public void writePayload(MessageId object, DataOutput dataOut) throws IOException {
dataOut.writeUTF(object.toString());
}
@@ -44,7 +45,7 @@
* @return unmarshalled object
* @throws IOException
*/
- public MessageId readPayload(DataInput dataIn) throws IOException{
+ public MessageId readPayload(DataInput dataIn) throws IOException {
return new MessageId(dataIn.readUTF());
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java Wed Aug 8 11:56:59 2007
@@ -27,7 +27,7 @@
*
* @version $Revision: 1.2 $
*/
-public class ObjectMarshaller implements Marshaller{
+public class ObjectMarshaller implements Marshaller {
/**
* Write the payload of this entry to the RawContainer
@@ -36,12 +36,12 @@
* @param dataOut
* @throws IOException
*/
- public void writePayload(Object object,DataOutput dataOut) throws IOException{
- ByteArrayOutputStream bytesOut=new ByteArrayOutputStream();
- ObjectOutputStream objectOut=new ObjectOutputStream(bytesOut);
+ public void writePayload(Object object, DataOutput dataOut) throws IOException {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ ObjectOutputStream objectOut = new ObjectOutputStream(bytesOut);
objectOut.writeObject(object);
objectOut.close();
- byte[] data=bytesOut.toByteArray();
+ byte[] data = bytesOut.toByteArray();
dataOut.writeInt(data.length);
dataOut.write(data);
}
@@ -53,15 +53,15 @@
* @return unmarshalled object
* @throws IOException
*/
- public Object readPayload(DataInput dataIn) throws IOException{
- int size=dataIn.readInt();
- byte[] data=new byte[size];
+ public Object readPayload(DataInput dataIn) throws IOException {
+ int size = dataIn.readInt();
+ byte[] data = new byte[size];
dataIn.readFully(data);
- ByteArrayInputStream bytesIn=new ByteArrayInputStream(data);
- ObjectInputStream objectIn=new ObjectInputStream(bytesIn);
- try{
+ ByteArrayInputStream bytesIn = new ByteArrayInputStream(data);
+ ObjectInputStream objectIn = new ObjectInputStream(bytesIn);
+ try {
return objectIn.readObject();
- }catch(ClassNotFoundException e){
+ } catch (ClassNotFoundException e) {
throw new IOException(e.getMessage());
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/RuntimeStoreException.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/RuntimeStoreException.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/RuntimeStoreException.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/RuntimeStoreException.java Wed Aug 8 11:56:59 2007
@@ -16,47 +16,48 @@
*/
package org.apache.activemq.kaha;
-
/**
-*Runtime exception for the Store
-*
-* @version $Revision: 1.2 $
-*/
-
-public class RuntimeStoreException extends RuntimeException{
-
-
- private static final long serialVersionUID=8807084681372365173L;
+ * Runtime exception for the Store
+ *
+ * @version $Revision: 1.2 $
+ */
+
+public class RuntimeStoreException extends RuntimeException {
+
+ private static final long serialVersionUID = 8807084681372365173L;
/**
* Constructor
*/
- public RuntimeStoreException(){
+ public RuntimeStoreException() {
super();
}
/**
* Constructor
+ *
* @param message
*/
- public RuntimeStoreException(String message){
+ public RuntimeStoreException(String message) {
super(message);
}
/**
* Constructor
+ *
* @param message
* @param cause
*/
- public RuntimeStoreException(String message,Throwable cause){
- super(message,cause);
+ public RuntimeStoreException(String message, Throwable cause) {
+ super(message, cause);
}
/**
* Constructor
+ *
* @param cause
*/
- public RuntimeStoreException(Throwable cause){
+ public RuntimeStoreException(Throwable cause) {
super(cause);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Wed Aug 8 11:56:59 2007
@@ -18,37 +18,38 @@
import java.io.IOException;
import java.util.Set;
+
/**
* A Store is holds persistent containers
*
* @version $Revision: 1.2 $
*/
-public interface Store{
+public interface Store {
/**
* Defauly container name
*/
- public static final String DEFAULT_CONTAINER_NAME="kaha";
+ public static final String DEFAULT_CONTAINER_NAME = "kaha";
/**
* Byte Marshaller
*/
public final static Marshaller BytesMarshaller = new BytesMarshaller();
-
+
/**
* Object Marshaller
*/
public final static Marshaller ObjectMarshaller = new ObjectMarshaller();
-
+
/**
* String Marshaller
*/
public final static Marshaller StringMarshaller = new StringMarshaller();
-
-
+
/**
* Command Marshaller
*/
public final static Marshaller CommandMarshaller = new CommandMarshaller();
+
/**
* close the store
*
@@ -86,46 +87,50 @@
* @throws IOException
*/
public boolean doesMapContainerExist(Object id) throws IOException;
-
+
/**
* Checks if a MapContainer exists in the named container
*
* @param id
- * @param containerName
+ * @param containerName
* @return new MapContainer
* @throws IOException
*/
- public boolean doesMapContainerExist(Object id,String containerName) throws IOException;
+ public boolean doesMapContainerExist(Object id, String containerName) throws IOException;
/**
- * Get a MapContainer with the given id - the MapContainer is created if needed
+ * Get a MapContainer with the given id - the MapContainer is created if
+ * needed
*
* @param id
* @return container for the associated id or null if it doesn't exist
* @throws IOException
*/
- public MapContainer getMapContainer(Object id) throws IOException;
+ public MapContainer getMapContainer(Object id) throws IOException;
/**
- * Get a MapContainer with the given id - the MapContainer is created if needed
+ * Get a MapContainer with the given id - the MapContainer is created if
+ * needed
*
* @param id
* @param containerName
* @return container for the associated id or null if it doesn't exist
* @throws IOException
*/
- public MapContainer getMapContainer(Object id,String containerName) throws IOException;
-
+ public MapContainer getMapContainer(Object id, String containerName) throws IOException;
+
/**
- * Get a MapContainer with the given id - the MapContainer is created if needed
+ * Get a MapContainer with the given id - the MapContainer is created if
+ * needed
*
* @param id
* @param containerName
- * @param persistentIndex
+ * @param persistentIndex
* @return container for the associated id or null if it doesn't exist
* @throws IOException
*/
- public MapContainer getMapContainer(Object id,String containerName,boolean persistentIndex) throws IOException;
+ public MapContainer getMapContainer(Object id, String containerName, boolean persistentIndex)
+ throws IOException;
/**
* delete a container from the default container
@@ -134,18 +139,19 @@
* @throws IOException
*/
public void deleteMapContainer(Object id) throws IOException;
-
+
/**
* delete a MapContainer from the name container
*
* @param id
- * @param containerName
+ * @param containerName
* @throws IOException
*/
- public void deleteMapContainer(Object id,String containerName) throws IOException;
-
+ public void deleteMapContainer(Object id, String containerName) throws IOException;
+
/**
* Delete Map container
+ *
* @param id
* @throws IOException
*/
@@ -167,16 +173,16 @@
* @throws IOException
*/
public boolean doesListContainerExist(Object id) throws IOException;
-
+
/**
* Checks if a ListContainer exists in the named container
*
* @param id
- * @param containerName
+ * @param containerName
* @return new MapContainer
* @throws IOException
*/
- public boolean doesListContainerExist(Object id,String containerName) throws IOException;
+ public boolean doesListContainerExist(Object id, String containerName) throws IOException;
/**
* Get a ListContainer with the given id and creates it if it doesn't exist
@@ -185,7 +191,7 @@
* @return container for the associated id or null if it doesn't exist
* @throws IOException
*/
- public ListContainer getListContainer(Object id) throws IOException;
+ public ListContainer getListContainer(Object id) throws IOException;
/**
* Get a ListContainer with the given id and creates it if it doesn't exist
@@ -195,18 +201,19 @@
* @return container for the associated id or null if it doesn't exist
* @throws IOException
*/
- public ListContainer getListContainer(Object id,String containerName) throws IOException;
-
+ public ListContainer getListContainer(Object id, String containerName) throws IOException;
+
/**
* Get a ListContainer with the given id and creates it if it doesn't exist
*
* @param id
* @param containerName
- * @param persistentIndex
+ * @param persistentIndex
* @return container for the associated id or null if it doesn't exist
* @throws IOException
*/
- public ListContainer getListContainer(Object id,String containerName,boolean persistentIndex) throws IOException;
+ public ListContainer getListContainer(Object id, String containerName, boolean persistentIndex)
+ throws IOException;
/**
* delete a ListContainer from the default container
@@ -215,18 +222,19 @@
* @throws IOException
*/
public void deleteListContainer(Object id) throws IOException;
-
+
/**
* delete a ListContainer from the named container
*
* @param id
- * @param containerName
+ * @param containerName
* @throws IOException
*/
- public void deleteListContainer(Object id,String containerName) throws IOException;
+ public void deleteListContainer(Object id, String containerName) throws IOException;
/**
* delete a list container
+ *
* @param id
* @throws IOException
*/
@@ -239,7 +247,7 @@
* @throws IOException
*/
public Set<ContainerId> getListContainerIds() throws IOException;
-
+
/**
* @return the maxDataFileLength
*/
@@ -249,20 +257,21 @@
* @param maxDataFileLength the maxDataFileLength to set
*/
public void setMaxDataFileLength(long maxDataFileLength);
-
+
/**
* @see org.apache.activemq.kaha.IndexTypes
* @return the default index type
*/
public String getIndexTypeAsString();
-
+
/**
* Set the default index type
+ *
* @param type
* @see org.apache.activemq.kaha.IndexTypes
*/
public void setIndexTypeAsString(String type);
-
+
/**
* @return true if the store has been initialized
*/
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java Wed Aug 8 11:56:59 2007
@@ -25,28 +25,29 @@
*
* @version $Revision: 1.2 $
*/
-public class StoreFactory{
-
+public class StoreFactory {
/**
* open or create a Store
+ *
* @param name
* @param mode
* @return the opened/created store
* @throws IOException
*/
- public static Store open(String name,String mode) throws IOException{
- return new KahaStore(name,mode);
+ public static Store open(String name, String mode) throws IOException {
+ return new KahaStore(name, mode);
}
-
+
/**
* Delete a database
+ *
* @param name of the database
* @return true if successful
- * @throws IOException
+ * @throws IOException
*/
- public static boolean delete(String name) throws IOException{
- KahaStore store = new KahaStore(name,"rw");
+ public static boolean delete(String name) throws IOException {
+ KahaStore store = new KahaStore(name, "rw");
return store.delete();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java Wed Aug 8 11:56:59 2007
@@ -19,6 +19,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+
/**
* Implementation of a Marshaller for Strings
*
@@ -32,7 +33,7 @@
* @param dataOut
* @throws IOException
*/
- public void writePayload(String object,DataOutput dataOut) throws IOException{
+ public void writePayload(String object, DataOutput dataOut) throws IOException {
dataOut.writeUTF(object);
}
@@ -43,7 +44,7 @@
* @return unmarshalled object
* @throws IOException
*/
- public String readPayload(DataInput dataIn) throws IOException{
+ public String readPayload(DataInput dataIn) throws IOException {
return dataIn.readUTF();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java Wed Aug 8 11:56:59 2007
@@ -8,35 +8,32 @@
public interface DataManager {
- String getName();
+ String getName();
- Object readItem(Marshaller marshaller, StoreLocation item)
- throws IOException;
+ Object readItem(Marshaller marshaller, StoreLocation item) throws IOException;
- StoreLocation storeDataItem(Marshaller marshaller, Object payload)
- throws IOException;
+ StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException;
- StoreLocation storeRedoItem(Object payload) throws IOException;
+ StoreLocation storeRedoItem(Object payload) throws IOException;
- void updateItem(StoreLocation location, Marshaller marshaller,
- Object payload) throws IOException;
+ void updateItem(StoreLocation location, Marshaller marshaller, Object payload) throws IOException;
- void recoverRedoItems(RedoListener listener) throws IOException;
+ void recoverRedoItems(RedoListener listener) throws IOException;
- void close() throws IOException;
+ void close() throws IOException;
- void force() throws IOException;
+ void force() throws IOException;
- boolean delete() throws IOException;
+ boolean delete() throws IOException;
- void addInterestInFile(int file) throws IOException;
+ void addInterestInFile(int file) throws IOException;
- void removeInterestInFile(int file) throws IOException;
+ void removeInterestInFile(int file) throws IOException;
- void consolidateDataFiles() throws IOException;
+ void consolidateDataFiles() throws IOException;
- Marshaller getRedoMarshaller();
+ Marshaller getRedoMarshaller();
- void setRedoMarshaller(Marshaller redoMarshaller);
+ void setRedoMarshaller(Marshaller redoMarshaller);
-}
\ No newline at end of file
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java Wed Aug 8 11:56:59 2007
@@ -35,58 +35,55 @@
import java.util.concurrent.ConcurrentHashMap;
/**
-* A container of roots for other Containers
-*
-* @version $Revision: 1.2 $
-*/
+ * A container of roots for other Containers
+ *
+ * @version $Revision: 1.2 $
+ */
class IndexRootContainer {
- private static final Log log=LogFactory.getLog(IndexRootContainer.class);
+ private static final Log log = LogFactory.getLog(IndexRootContainer.class);
protected static final Marshaller rootMarshaller = Store.ObjectMarshaller;
protected IndexItem root;
protected IndexManager indexManager;
protected DataManager dataManager;
protected Map map = new ConcurrentHashMap();
protected LinkedList list = new LinkedList();
-
-
- IndexRootContainer(IndexItem root,IndexManager im,DataManager dfm) throws IOException{
- this.root=root;
- this.indexManager=im;
- this.dataManager=dfm;
- long nextItem=root.getNextItem();
- while(nextItem!=Item.POSITION_NOT_SET){
- StoreEntry item=indexManager.getIndex(nextItem);
- StoreLocation data=item.getKeyDataItem();
- Object key = dataManager.readItem(rootMarshaller,data);
- map.put(key,item);
+
+ IndexRootContainer(IndexItem root, IndexManager im, DataManager dfm) throws IOException {
+ this.root = root;
+ this.indexManager = im;
+ this.dataManager = dfm;
+ long nextItem = root.getNextItem();
+ while (nextItem != Item.POSITION_NOT_SET) {
+ StoreEntry item = indexManager.getIndex(nextItem);
+ StoreLocation data = item.getKeyDataItem();
+ Object key = dataManager.readItem(rootMarshaller, data);
+ map.put(key, item);
list.add(item);
- nextItem=item.getNextItem();
+ nextItem = item.getNextItem();
dataManager.addInterestInFile(item.getKeyFile());
}
}
-
- Set getKeys(){
+
+ Set getKeys() {
return map.keySet();
}
-
-
-
- IndexItem addRoot(IndexManager containerIndexManager,ContainerId key) throws IOException{
- if (map.containsKey(key)){
- removeRoot(containerIndexManager,key);
+
+ IndexItem addRoot(IndexManager containerIndexManager, ContainerId key) throws IOException {
+ if (map.containsKey(key)) {
+ removeRoot(containerIndexManager, key);
}
-
+
StoreLocation data = dataManager.storeDataItem(rootMarshaller, key);
IndexItem newRoot = indexManager.createNewIndex();
newRoot.setKeyData(data);
IndexItem containerRoot = containerIndexManager.createNewIndex();
containerIndexManager.storeIndex(containerRoot);
newRoot.setValueOffset(containerRoot.getOffset());
-
- IndexItem last=list.isEmpty()?null:(IndexItem) list.getLast();
- last=last==null?root:last;
- long prev=last.getOffset();
+
+ IndexItem last = list.isEmpty() ? null : (IndexItem)list.getLast();
+ last = last == null ? root : last;
+ long prev = last.getOffset();
newRoot.setPreviousItem(prev);
indexManager.storeIndex(newRoot);
last.setNextItem(newRoot.getOffset());
@@ -95,25 +92,25 @@
list.add(newRoot);
return containerRoot;
}
-
- void removeRoot(IndexManager containerIndexManager,ContainerId key) throws IOException{
- StoreEntry oldRoot=(StoreEntry)map.remove(key);
- if(oldRoot!=null){
+
+ void removeRoot(IndexManager containerIndexManager, ContainerId key) throws IOException {
+ StoreEntry oldRoot = (StoreEntry)map.remove(key);
+ if (oldRoot != null) {
dataManager.removeInterestInFile(oldRoot.getKeyFile());
// get the container root
- IndexItem containerRoot=containerIndexManager.getIndex(oldRoot.getValueOffset());
- if(containerRoot!=null){
+ IndexItem containerRoot = containerIndexManager.getIndex(oldRoot.getValueOffset());
+ if (containerRoot != null) {
containerIndexManager.freeIndex(containerRoot);
}
- int index=list.indexOf(oldRoot);
- IndexItem prev=index>0?(IndexItem)list.get(index-1):root;
- prev=prev==null?root:prev;
- IndexItem next=index<(list.size()-1)?(IndexItem)list.get(index+1):null;
- if(next!=null){
+ int index = list.indexOf(oldRoot);
+ IndexItem prev = index > 0 ? (IndexItem)list.get(index - 1) : root;
+ prev = prev == null ? root : prev;
+ IndexItem next = index < (list.size() - 1) ? (IndexItem)list.get(index + 1) : null;
+ if (next != null) {
prev.setNextItem(next.getOffset());
next.setPreviousItem(prev.getOffset());
indexManager.updateIndexes(next);
- }else{
+ } else {
prev.setNextItem(Item.POSITION_NOT_SET);
}
indexManager.updateIndexes(prev);
@@ -121,19 +118,17 @@
indexManager.freeIndex((IndexItem)oldRoot);
}
}
-
- IndexItem getRoot(IndexManager containerIndexManager,ContainerId key) throws IOException{
- StoreEntry index = (StoreEntry) map.get(key);
- if (index != null){
+
+ IndexItem getRoot(IndexManager containerIndexManager, ContainerId key) throws IOException {
+ StoreEntry index = (StoreEntry)map.get(key);
+ if (index != null) {
return containerIndexManager.getIndex(index.getValueOffset());
}
return null;
}
-
- boolean doesRootExist(Object key){
+
+ boolean doesRootExist(Object key) {
return map.containsKey(key);
}
-
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreLockedExcpetion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreLockedExcpetion.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreLockedExcpetion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreLockedExcpetion.java Wed Aug 8 11:56:59 2007
@@ -18,27 +18,25 @@
import java.io.IOException;
-
-
/**
-* Exception thrown if the store is in use by another application
-*
-* @version $Revision: 1.1.1.1 $
-*/
-public class StoreLockedExcpetion extends IOException{
+ * Exception thrown if the store is in use by another application
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class StoreLockedExcpetion extends IOException {
- private static final long serialVersionUID=3857646689671366926L;
+ private static final long serialVersionUID = 3857646689671366926L;
/**
* Default Constructor
*/
- public StoreLockedExcpetion(){
+ public StoreLockedExcpetion() {
}
/**
* @param s
*/
- public StoreLockedExcpetion(String s){
+ public StoreLockedExcpetion(String s) {
super(s);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java Wed Aug 8 11:56:59 2007
@@ -23,129 +23,130 @@
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
import org.apache.activemq.util.ByteSequence;
+
/**
- * Optimized Store reader and updater. Single threaded and synchronous. Use in conjunction
- * with the DataFileAccessorPool of concurrent use.
+ * Optimized Store reader and updater. Single threaded and synchronous. Use in
+ * conjunction with the DataFileAccessorPool of concurrent use.
*
* @version $Revision: 1.1.1.1 $
*/
final class DataFileAccessor {
-
- private final DataFile dataFile;
- private final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites;
- private final RandomAccessFile file;
- private boolean disposed;
-
+
+ private final DataFile dataFile;
+ private final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites;
+ private final RandomAccessFile file;
+ private boolean disposed;
+
/**
* Construct a Store reader
*
* @param fileId
- * @throws IOException
+ * @throws IOException
*/
- public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException{
- this.dataFile = dataFile;
- this.inflightWrites = dataManager.getInflightWrites();
- this.file = dataFile.openRandomAccessFile(false);
+ public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException {
+ this.dataFile = dataFile;
+ this.inflightWrites = dataManager.getInflightWrites();
+ this.file = dataFile.openRandomAccessFile(false);
+ }
+
+ public DataFile getDataFile() {
+ return dataFile;
}
- public DataFile getDataFile() {
- return dataFile;
- }
-
- public void dispose() {
- if( disposed )
- return;
- disposed=true;
+ public void dispose() {
+ if (disposed)
+ return;
+ disposed = true;
try {
- dataFile.closeRandomAccessFile(file);
- } catch (IOException e) {
- e.printStackTrace();
- }
+ dataFile.closeRandomAccessFile(file);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
-
+
public ByteSequence readRecord(Location location) throws IOException {
-
- if( !location.isValid() )
- throw new IOException("Invalid location: "+location);
-
- WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
- if( asyncWrite!= null ) {
- return asyncWrite.data;
- }
-
- try {
-
- if( location.getSize()==Location.NOT_SET ) {
- file.seek(location.getOffset());
- location.setSize(file.readInt());
- file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE);
- } else {
- file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE);
- }
-
- byte[] data=new byte[location.getSize()-AsyncDataManager.ITEM_HEAD_FOOT_SPACE];
- file.readFully(data);
- return new ByteSequence(data, 0, data.length);
-
- } catch (RuntimeException e) {
- throw new IOException("Invalid location: "+location+", : "+e);
- }
+
+ if (!location.isValid())
+ throw new IOException("Invalid location: " + location);
+
+ WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+ if (asyncWrite != null) {
+ return asyncWrite.data;
+ }
+
+ try {
+
+ if (location.getSize() == Location.NOT_SET) {
+ file.seek(location.getOffset());
+ location.setSize(file.readInt());
+ file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
+ } else {
+ file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
+ }
+
+ byte[] data = new byte[location.getSize() - AsyncDataManager.ITEM_HEAD_FOOT_SPACE];
+ file.readFully(data);
+ return new ByteSequence(data, 0, data.length);
+
+ } catch (RuntimeException e) {
+ throw new IOException("Invalid location: " + location + ", : " + e);
+ }
}
-
+
public void readLocationDetails(Location location) throws IOException {
- WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
- if( asyncWrite!= null ) {
- location.setSize(asyncWrite.location.getSize());
- location.setType(asyncWrite.location.getType());
- } else {
- file.seek(location.getOffset());
- location.setSize(file.readInt());
- location.setType(file.readByte());
- }
- }
-
- public boolean readLocationDetailsAndValidate(Location location) {
- try {
- WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
- if( asyncWrite!= null ) {
- location.setSize(asyncWrite.location.getSize());
- location.setType(asyncWrite.location.getType());
- } else {
- file.seek(location.getOffset());
- location.setSize(file.readInt());
- location.setType(file.readByte());
-
- byte data[] = new byte[3];
- file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR);
- file.readFully(data);
- if( data[0] != AsyncDataManager.ITEM_HEAD_SOR[0] ||
- data[1] != AsyncDataManager.ITEM_HEAD_SOR[1] ||
- data[2] != AsyncDataManager.ITEM_HEAD_SOR[2] ) {
- return false;
- }
- file.seek(location.getOffset()+location.getSize()-AsyncDataManager.ITEM_FOOT_SPACE);
- file.readFully(data);
- if( data[0] != AsyncDataManager.ITEM_HEAD_EOR[0] ||
- data[1] != AsyncDataManager.ITEM_HEAD_EOR[1] ||
- data[2] != AsyncDataManager.ITEM_HEAD_EOR[2] ) {
- return false;
- }
- }
- } catch (IOException e) {
- return false;
- }
- return true;
- }
-
- public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
-
- file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE);
- int size = Math.min(data.getLength(), location.getSize());
- file.write(data.getData(), data.getOffset(), size);
- if( sync ) {
- file.getFD().sync();
- }
-
- }
+ WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+ if (asyncWrite != null) {
+ location.setSize(asyncWrite.location.getSize());
+ location.setType(asyncWrite.location.getType());
+ } else {
+ file.seek(location.getOffset());
+ location.setSize(file.readInt());
+ location.setType(file.readByte());
+ }
+ }
+
+ public boolean readLocationDetailsAndValidate(Location location) {
+ try {
+ WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+ if (asyncWrite != null) {
+ location.setSize(asyncWrite.location.getSize());
+ location.setType(asyncWrite.location.getType());
+ } else {
+ file.seek(location.getOffset());
+ location.setSize(file.readInt());
+ location.setType(file.readByte());
+
+ byte data[] = new byte[3];
+ file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR);
+ file.readFully(data);
+ if (data[0] != AsyncDataManager.ITEM_HEAD_SOR[0]
+ || data[1] != AsyncDataManager.ITEM_HEAD_SOR[1]
+ || data[2] != AsyncDataManager.ITEM_HEAD_SOR[2]) {
+ return false;
+ }
+ file.seek(location.getOffset() + location.getSize() - AsyncDataManager.ITEM_FOOT_SPACE);
+ file.readFully(data);
+ if (data[0] != AsyncDataManager.ITEM_HEAD_EOR[0]
+ || data[1] != AsyncDataManager.ITEM_HEAD_EOR[1]
+ || data[2] != AsyncDataManager.ITEM_HEAD_EOR[2]) {
+ return false;
+ }
+ }
+ } catch (IOException e) {
+ return false;
+ }
+ return true;
+ }
+
+ public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
+
+ file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
+ int size = Math.min(data.getLength(), location.getSize());
+ file.write(data.getData(), data.getOffset(), size);
+ if (sync) {
+ file.getFD().sync();
+ }
+
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java Wed Aug 8 11:56:59 2007
@@ -27,348 +27,351 @@
import org.apache.activemq.util.LinkedNode;
/**
- * An optimized writer to do batch appends to a data file. This object is thread safe
- * and gains throughput as you increase the number of concurrent writes it does.
+ * An optimized writer to do batch appends to a data file. This object is thread
+ * safe and gains throughput as you increase the number of concurrent writes it
+ * does.
*
* @version $Revision: 1.1.1.1 $
*/
class DataFileAppender {
-
- protected static final byte []RESERVED_SPACE= new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE];
- protected static final String SHUTDOWN_COMMAND = "SHUTDOWN";
- int MAX_WRITE_BATCH_SIZE = 1024*1024*4;
-
- static public class WriteKey {
- private final int file;
- private final long offset;
- private final int hash;
-
- public WriteKey(Location item){
- file = item.getDataFileId();
- offset = item.getOffset();
- // TODO: see if we can build a better hash
- hash = (int) (file ^ offset);
- }
-
- public int hashCode() {
- return hash;
- }
-
- public boolean equals(Object obj){
- if(obj instanceof WriteKey){
- WriteKey di=(WriteKey)obj;
- return di.file==file&&di.offset==offset;
+
+ protected static final byte[] RESERVED_SPACE = new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE];
+ protected static final String SHUTDOWN_COMMAND = "SHUTDOWN";
+ int MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
+
+ static public class WriteKey {
+ private final int file;
+ private final long offset;
+ private final int hash;
+
+ public WriteKey(Location item) {
+ file = item.getDataFileId();
+ offset = item.getOffset();
+ // TODO: see if we can build a better hash
+ hash = (int)(file ^ offset);
+ }
+
+ public int hashCode() {
+ return hash;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof WriteKey) {
+ WriteKey di = (WriteKey)obj;
+ return di.file == file && di.offset == offset;
}
return false;
}
- }
-
- public class WriteBatch {
-
- public final DataFile dataFile;
- public final WriteCommand first;
- public final CountDownLatch latch = new CountDownLatch(1);
- public int size;
-
- public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
- this.dataFile=dataFile;
- this.first=write;
- size+=write.location.getSize();
- }
-
- public boolean canAppend(DataFile dataFile, WriteCommand write) {
- if( dataFile != this.dataFile )
- return false;
- if( size+write.location.getSize() >= MAX_WRITE_BATCH_SIZE )
- return false;
- return true;
- }
-
- public void append(WriteCommand write) throws IOException {
- this.first.getTailNode().linkAfter(write);
- size+=write.location.getSize();
- }
- }
-
- public static class WriteCommand extends LinkedNode {
- public final Location location;
- public final ByteSequence data;
- final boolean sync;
-
- public WriteCommand(Location location, ByteSequence data, boolean sync) {
- this.location = location;
- this.data = data;
- this.sync = sync;
- }
}
-
- protected final AsyncDataManager dataManager;
-
+
+ public class WriteBatch {
+
+ public final DataFile dataFile;
+ public final WriteCommand first;
+ public final CountDownLatch latch = new CountDownLatch(1);
+ public int size;
+
+ public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
+ this.dataFile = dataFile;
+ this.first = write;
+ size += write.location.getSize();
+ }
+
+ public boolean canAppend(DataFile dataFile, WriteCommand write) {
+ if (dataFile != this.dataFile)
+ return false;
+ if (size + write.location.getSize() >= MAX_WRITE_BATCH_SIZE)
+ return false;
+ return true;
+ }
+
+ public void append(WriteCommand write) throws IOException {
+ this.first.getTailNode().linkAfter(write);
+ size += write.location.getSize();
+ }
+ }
+
+ public static class WriteCommand extends LinkedNode {
+ public final Location location;
+ public final ByteSequence data;
+ final boolean sync;
+
+ public WriteCommand(Location location, ByteSequence data, boolean sync) {
+ this.location = location;
+ this.data = data;
+ this.sync = sync;
+ }
+ }
+
+ protected final AsyncDataManager dataManager;
+
protected final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites;
-
+
protected final Object enqueueMutex = new Object();
- protected WriteBatch nextWriteBatch;
-
+ protected WriteBatch nextWriteBatch;
+
private boolean running;
protected boolean shutdown;
protected IOException firstAsyncException;
protected final CountDownLatch shutdownDone = new CountDownLatch(1);
- private Thread thread;
-
+ private Thread thread;
+
/**
* Construct a Store writer
*
* @param fileId
*/
- public DataFileAppender(AsyncDataManager dataManager){
- this.dataManager=dataManager;
+ public DataFileAppender(AsyncDataManager dataManager) {
+ this.dataManager = dataManager;
this.inflightWrites = this.dataManager.getInflightWrites();
}
-
+
/**
- * @param type
+ * @param type
* @param marshaller
* @param payload
- * @param type
- * @param sync
+ * @param type
+ * @param sync
* @return
* @throws IOException
- * @throws
- * @throws
+ * @throws
+ * @throws
*/
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
-
+
// Write the packet our internal buffer.
- int size = data.getLength()+AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
-
- final Location location=new Location();
- location.setSize(size);
+ int size = data.getLength() + AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
+
+ final Location location = new Location();
+ location.setSize(size);
location.setType(type);
-
+
WriteBatch batch;
- WriteCommand write = new WriteCommand(location, data, sync);
+ WriteCommand write = new WriteCommand(location, data, sync);
- // Locate datafile and enqueue into the executor in sychronized block so that
- // writes get equeued onto the executor in order that they were assigned by
+ // Locate datafile and enqueue into the executor in sychronized block so
+ // that
+ // writes get equeued onto the executor in order that they were assigned
+ // by
// the data manager (which is basically just appending)
-
- synchronized(this) {
+
+ synchronized (this) {
// Find the position where this item will land at.
- DataFile dataFile=dataManager.allocateLocation(location);
- batch = enqueue(dataFile, write);
+ DataFile dataFile = dataManager.allocateLocation(location);
+ batch = enqueue(dataFile, write);
}
location.setLatch(batch.latch);
- if( sync ) {
- try {
- batch.latch.await();
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- } else {
+ if (sync) {
+ try {
+ batch.latch.await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ } else {
inflightWrites.put(new WriteKey(location), write);
- }
-
+ }
+
return location;
}
private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
- synchronized(enqueueMutex) {
- WriteBatch rc=null;
- if( shutdown ) {
- throw new IOException("Async Writter Thread Shutdown");
- }
- if( firstAsyncException !=null )
- throw firstAsyncException;
-
- if( !running ) {
- running=true;
- thread = new Thread() {
- public void run() {
- processQueue();
- }
- };
- thread.setPriority(Thread.MAX_PRIORITY);
- thread.setDaemon(true);
- thread.setName("ActiveMQ Data File Writer");
- thread.start();
- }
-
- if( nextWriteBatch == null ) {
- nextWriteBatch = new WriteBatch(dataFile,write);
- rc = nextWriteBatch;
- enqueueMutex.notify();
- } else {
- // Append to current batch if possible..
- if( nextWriteBatch.canAppend(dataFile, write) ) {
- nextWriteBatch.append(write);
- rc = nextWriteBatch;
- } else {
- // Otherwise wait for the queuedCommand to be null
- try {
- while( nextWriteBatch!=null ) {
- enqueueMutex.wait();
- }
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- if( shutdown ) {
- throw new IOException("Async Writter Thread Shutdown");
- }
-
- // Start a new batch.
- nextWriteBatch = new WriteBatch(dataFile,write);
- rc = nextWriteBatch;
- enqueueMutex.notify();
- }
- }
- return rc;
- }
- }
+ synchronized (enqueueMutex) {
+ WriteBatch rc = null;
+ if (shutdown) {
+ throw new IOException("Async Writter Thread Shutdown");
+ }
+ if (firstAsyncException != null)
+ throw firstAsyncException;
+
+ if (!running) {
+ running = true;
+ thread = new Thread() {
+ public void run() {
+ processQueue();
+ }
+ };
+ thread.setPriority(Thread.MAX_PRIORITY);
+ thread.setDaemon(true);
+ thread.setName("ActiveMQ Data File Writer");
+ thread.start();
+ }
+
+ if (nextWriteBatch == null) {
+ nextWriteBatch = new WriteBatch(dataFile, write);
+ rc = nextWriteBatch;
+ enqueueMutex.notify();
+ } else {
+ // Append to current batch if possible..
+ if (nextWriteBatch.canAppend(dataFile, write)) {
+ nextWriteBatch.append(write);
+ rc = nextWriteBatch;
+ } else {
+ // Otherwise wait for the queuedCommand to be null
+ try {
+ while (nextWriteBatch != null) {
+ enqueueMutex.wait();
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ if (shutdown) {
+ throw new IOException("Async Writter Thread Shutdown");
+ }
+
+ // Start a new batch.
+ nextWriteBatch = new WriteBatch(dataFile, write);
+ rc = nextWriteBatch;
+ enqueueMutex.notify();
+ }
+ }
+ return rc;
+ }
+ }
public void close() throws IOException {
- synchronized( enqueueMutex ) {
- if( shutdown == false ) {
- shutdown = true;
- if( running ) {
- enqueueMutex.notifyAll();
- } else {
- shutdownDone.countDown();
- }
- }
- }
-
- try {
- shutdownDone.await();
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
-
+ synchronized (enqueueMutex) {
+ if (shutdown == false) {
+ shutdown = true;
+ if (running) {
+ enqueueMutex.notifyAll();
+ } else {
+ shutdownDone.countDown();
+ }
+ }
+ }
+
+ try {
+ shutdownDone.await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+
}
/**
- * The async processing loop that writes to the data files and
- * does the force calls.
+ * The async processing loop that writes to the data files and does the
+ * force calls.
*
- * Since the file sync() call is the slowest of all the operations,
- * this algorithm tries to 'batch' or group together several file sync() requests
- * into a single file sync() call. The batching is accomplished attaching the
- * same CountDownLatch instance to every force request in a group.
+ * Since the file sync() call is the slowest of all the operations, this
+ * algorithm tries to 'batch' or group together several file sync() requests
+ * into a single file sync() call. The batching is accomplished attaching
+ * the same CountDownLatch instance to every force request in a group.
*
*/
protected void processQueue() {
- DataFile dataFile=null;
- RandomAccessFile file=null;
- try {
-
- DataByteArrayOutputStream buff = new DataByteArrayOutputStream(MAX_WRITE_BATCH_SIZE);
- while( true ) {
-
- Object o = null;
-
- // Block till we get a command.
- synchronized(enqueueMutex) {
- while( true ) {
- if( shutdown ) {
- o = SHUTDOWN_COMMAND;
- break;
- }
- if( nextWriteBatch!=null ) {
- o = nextWriteBatch;
- nextWriteBatch=null;
- break;
- }
- enqueueMutex.wait();
- }
- enqueueMutex.notify();
- }
-
-
- if( o == SHUTDOWN_COMMAND ) {
- break;
- }
-
- WriteBatch wb = (WriteBatch) o;
- if( dataFile != wb.dataFile ) {
- if( file!=null ) {
- dataFile.closeRandomAccessFile(file);
- }
- dataFile = wb.dataFile;
- file = dataFile.openRandomAccessFile(true);
- }
-
- WriteCommand write = wb.first;
-
- // Write all the data.
- // Only need to seek to first location.. all others
- // are in sequence.
- file.seek(write.location.getOffset());
-
- //
- // is it just 1 big write?
- if( wb.size == write.location.getSize() ) {
-
- // Just write it directly..
- file.writeInt(write.location.getSize());
- file.writeByte(write.location.getType());
- file.write(RESERVED_SPACE);
- file.write(AsyncDataManager.ITEM_HEAD_SOR);
- file.write(write.data.getData(),write.data.getOffset(), write.data.getLength());
- file.write(AsyncDataManager.ITEM_HEAD_EOR);
-
- } else {
-
- // Combine the smaller writes into 1 big buffer
- while( write!=null ) {
-
- buff.writeInt(write.location.getSize());
- buff.writeByte(write.location.getType());
- buff.write(RESERVED_SPACE);
- buff.write(AsyncDataManager.ITEM_HEAD_SOR);
- buff.write(write.data.getData(),write.data.getOffset(), write.data.getLength());
- buff.write(AsyncDataManager.ITEM_HEAD_EOR);
-
- write = (WriteCommand) write.getNext();
- }
-
- // Now do the 1 big write.
- ByteSequence sequence = buff.toByteSequence();
- file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
- buff.reset();
- }
-
- file.getFD().sync();
-
- WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode();
- dataManager.setLastAppendLocation( lastWrite.location );
-
- // Signal any waiting threads that the write is on disk.
- wb.latch.countDown();
-
- // Now that the data is on disk, remove the writes from the in flight
- // cache.
- write = wb.first;
- while( write!=null ) {
- if( !write.sync ) {
- inflightWrites.remove(new WriteKey(write.location));
- }
- write = (WriteCommand) write.getNext();
- }
- }
- buff.close();
- } catch (IOException e) {
- synchronized( enqueueMutex ) {
- firstAsyncException = e;
- }
- } catch (InterruptedException e) {
- } finally {
- try {
- if( file!=null ) {
- dataFile.closeRandomAccessFile(file);
- }
- } catch (IOException e) {
- }
- shutdownDone.countDown();
- }
+ DataFile dataFile = null;
+ RandomAccessFile file = null;
+ try {
+
+ DataByteArrayOutputStream buff = new DataByteArrayOutputStream(MAX_WRITE_BATCH_SIZE);
+ while (true) {
+
+ Object o = null;
+
+ // Block till we get a command.
+ synchronized (enqueueMutex) {
+ while (true) {
+ if (shutdown) {
+ o = SHUTDOWN_COMMAND;
+ break;
+ }
+ if (nextWriteBatch != null) {
+ o = nextWriteBatch;
+ nextWriteBatch = null;
+ break;
+ }
+ enqueueMutex.wait();
+ }
+ enqueueMutex.notify();
+ }
+
+ if (o == SHUTDOWN_COMMAND) {
+ break;
+ }
+
+ WriteBatch wb = (WriteBatch)o;
+ if (dataFile != wb.dataFile) {
+ if (file != null) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ dataFile = wb.dataFile;
+ file = dataFile.openRandomAccessFile(true);
+ }
+
+ WriteCommand write = wb.first;
+
+ // Write all the data.
+ // Only need to seek to first location.. all others
+ // are in sequence.
+ file.seek(write.location.getOffset());
+
+ //
+ // is it just 1 big write?
+ if (wb.size == write.location.getSize()) {
+
+ // Just write it directly..
+ file.writeInt(write.location.getSize());
+ file.writeByte(write.location.getType());
+ file.write(RESERVED_SPACE);
+ file.write(AsyncDataManager.ITEM_HEAD_SOR);
+ file.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+ file.write(AsyncDataManager.ITEM_HEAD_EOR);
+
+ } else {
+
+ // Combine the smaller writes into 1 big buffer
+ while (write != null) {
+
+ buff.writeInt(write.location.getSize());
+ buff.writeByte(write.location.getType());
+ buff.write(RESERVED_SPACE);
+ buff.write(AsyncDataManager.ITEM_HEAD_SOR);
+ buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+ buff.write(AsyncDataManager.ITEM_HEAD_EOR);
+
+ write = (WriteCommand)write.getNext();
+ }
+
+ // Now do the 1 big write.
+ ByteSequence sequence = buff.toByteSequence();
+ file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+ buff.reset();
+ }
+
+ file.getFD().sync();
+
+ WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+ dataManager.setLastAppendLocation(lastWrite.location);
+
+ // Signal any waiting threads that the write is on disk.
+ wb.latch.countDown();
+
+ // Now that the data is on disk, remove the writes from the in
+ // flight
+ // cache.
+ write = wb.first;
+ while (write != null) {
+ if (!write.sync) {
+ inflightWrites.remove(new WriteKey(write.location));
+ }
+ write = (WriteCommand)write.getNext();
+ }
+ }
+ buff.close();
+ } catch (IOException e) {
+ synchronized (enqueueMutex) {
+ firstAsyncException = e;
+ }
+ } catch (InterruptedException e) {
+ } finally {
+ try {
+ if (file != null) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ } catch (IOException e) {
+ }
+ shutdownDone.countDown();
+ }
}
-
+
}