You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/11/20 19:13:51 UTC

svn commit: r477273 - in /incubator/activemq/trunk: ./ activemq-core/ activemq-core/src/main/java/org/apache/activemq/store/jpa/ activemq-core/src/main/java/org/apache/activemq/store/jpa/model/ activemq-core/src/main/java/org/apache/activemq/util/ acti...

Author: chirino
Date: Mon Nov 20 10:13:50 2006
New Revision: 477273

URL: http://svn.apache.org/viewvc?view=rev&rev=477273
Log:
Added the initial cut of a JPA based message store.


Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
Modified:
    incubator/activemq/trunk/activemq-core/pom.xml
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java
    incubator/activemq/trunk/pom.xml

Modified: incubator/activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/pom.xml?view=diff&rev=477273&r1=477272&r2=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/pom.xml (original)
+++ incubator/activemq/trunk/activemq-core/pom.xml Mon Nov 20 10:13:50 2006
@@ -156,6 +156,10 @@
       <version>1.2.24</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.openjpa</groupId>
+      <artifactId>openjpa-persistence-jdbc</artifactId>
+    </dependency>    
   </dependencies>
 
   <build>
@@ -270,6 +274,7 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-antrun-plugin</artifactId>
+        <!-- 
         <configuration>
           <tasks>
             <taskdef name="generate" classname="org.apache.activemq.openwire.tool.JavaGeneratorTask"/>
@@ -283,33 +288,38 @@
             <version>${activemq-version}</version>
           </dependency>
         </dependencies>
+         -->
+        
+        <executions>
+          <execution>
+            <phase>process-classes</phase>
+            <configuration>
+              <tasks>
+                <path id="cp">
+                  <path refid="maven.test.classpath"/>
+                  <path refid="maven.compile.classpath"/>
+                  <path refid="maven.dependency.classpath"/>
+                </path>
+                <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask">
+                  <classpath refid="cp"/>
+                </taskdef>
+                <openjpac directory="${basedir}/target/jpa-classes">
+                  <classpath refid="cp"/>
+                  <fileset dir="${basedir}/target/classes">
+                    <include name="org/apache/activemq/store/jpa/model/*.class"/>
+                  </fileset>
+                </openjpac>
+                <copy todir="${basedir}/target/classes">
+                  <fileset dir="${basedir}/target/jpa-classes"/>
+                </copy>
+              </tasks>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
       </plugin>
-
-      <!-- Use Gram to Gernerate the OpenWire Marshallers -->
-      <!--
-      <plugin>
-        <groupId>org.apache.activemq</groupId>
-        <artifactId>maven-gram-plugin</artifactId>
-        <version>4.1-incubator</version>
-        <configuration>
-          <scripts>
-            :GenerateJavaMarshalling.groovy: GenerateJavaTests.groovy: GenerateCSharpMarshalling.groovy:
-            GenerateCSharpClasses.groovy: GenerateCppMarshallingClasses.groovy: GenerateCppMarshallingHeaders.groovy:
-            GenerateCppHeaders.groovy: GenerateCppClasses.groovy: GenerateCMarshalling.groovy:
-          </scripts>
-          <groovyProperties>
-            <version>2</version>
-          </groovyProperties>
-        </configuration>
-        <dependencies>
-          <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-openwire-generator</artifactId>
-            <version>${activemq-version}</version>
-          </dependency>
-        </dependencies>
-      </plugin>
-      -->
 
       <plugin>
         <groupId>org.codehaus.mojo</groupId>

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java Mon Nov 20 10:13:50 2006
@@ -0,0 +1,194 @@
+package org.apache.activemq.store.jpa;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.jpa.model.StoredMessage;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+public class JPAMessageStore implements MessageStore {
+
+	protected final JPAPersistenceAdapter adapter;
+	protected final WireFormat wireFormat;
+	protected final ActiveMQDestination destination;
+	protected final String destinationName;
+    protected AtomicLong lastMessageId = new AtomicLong(-1);
+
+	public JPAMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
+		this.adapter = adapter;
+		this.destination = destination;
+		this.destinationName = destination.getQualifiedName();
+		this.wireFormat = this.adapter.getWireFormat();
+	}
+
+	public void addMessage(ConnectionContext context, Message message) throws IOException {
+		
+		EntityManager manager = adapter.beginEntityManager(context);
+		try {
+			
+			ByteSequence sequence = wireFormat.marshal(message);
+			sequence.compact();
+			
+			StoredMessage sm = new StoredMessage();
+			sm.setDestination(destinationName);
+			sm.setId(message.getMessageId().getBrokerSequenceId());
+			sm.setMessageId(message.getMessageId().toString());
+			sm.setExiration(message.getExpiration());
+			sm.setData(sequence.data);
+		
+			manager.persist(sm);
+			
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(context,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(context,manager);		
+	}
+
+	public void addMessageReference(ConnectionContext context,
+			MessageId messageId, long expirationTime, String messageRef)
+			throws IOException {
+		throw new IOException("Not implemented.");
+	}
+
+	public ActiveMQDestination getDestination() {
+		return destination;
+	}
+
+	public Message getMessage(MessageId identity) throws IOException {
+		Message rc;
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+			StoredMessage message=null;
+			if( identity.getBrokerSequenceId()!= 0 ) {
+				message = manager.find(StoredMessage.class, identity.getBrokerSequenceId());			
+			} else {
+				Query query = manager.createQuery("select m from StoredMessage m where m.messageId=?1");
+				query.setParameter(1, identity.toString());
+				message = (StoredMessage) query.getSingleResult();
+			}
+			
+			rc = (Message) wireFormat.unmarshal(new ByteSequence(message.getData()));
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+		return rc;
+	}
+
+	public int getMessageCount() throws IOException {
+		Integer rc;
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+			Query query = manager.createQuery("select count(m) from StoredMessage m");
+			rc = (Integer) query.getSingleResult();
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+		return rc;
+	}
+
+	public String getMessageReference(MessageId identity) throws IOException {
+		throw new IOException("Not implemented.");
+	}
+
+	public void recover(MessageRecoveryListener container) throws Exception {
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+			Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 order by m.id asc");
+			query.setParameter(1, destinationName);
+			for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
+				Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
+				container.recoverMessage(message);
+	        }
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+	}
+
+	public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
+		
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+			
+			Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
+			query.setParameter(1, destinationName);
+			query.setParameter(2, lastMessageId.get());
+			query.setMaxResults(maxReturned);
+			int count = 0;
+			for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
+				Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
+				listener.recoverMessage(message);
+				lastMessageId.set(m.getId());
+				count++;
+				if( count >= maxReturned ) { 
+					return;
+				}
+	        }
+
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+	}
+
+	public void removeAllMessages(ConnectionContext context) throws IOException {
+		EntityManager manager = adapter.beginEntityManager(context);
+		try {
+			Query query = manager.createQuery("delete from StoredMessage m where m.destination=?1");
+			query.setParameter(1, destinationName);
+			query.executeUpdate();
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(context,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(context,manager);
+	}
+
+	public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
+		EntityManager manager = adapter.beginEntityManager(context);
+		try {
+			Query query = manager.createQuery("delete from StoredMessage m where m.id=?1");
+			query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId());
+			query.executeUpdate();
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(context,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(context,manager);
+	}
+
+	public void resetBatching() {
+        lastMessageId.set(-1);
+	}
+
+	public void setUsageManager(UsageManager usageManager) {
+	}
+
+	public void start() throws Exception {
+	}
+
+	public void stop() throws Exception {
+	}
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java Mon Nov 20 10:13:50 2006
@@ -0,0 +1,253 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import javax.persistence.Query;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.memory.MemoryTransactionStore;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An implementation of {@link PersistenceAdapter} that uses JPA to
+ * store it's messages.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 1.17 $
+ */
+public class JPAPersistenceAdapter implements PersistenceAdapter {
+
+    private static final Log log = LogFactory.getLog(JPAPersistenceAdapter.class);
+    String entityManagerName = "activemq";
+	Properties entityManagerProperties = System.getProperties();
+	EntityManagerFactory entityManagerFactory;
+	private WireFormat wireFormat;
+	private MemoryTransactionStore transactionStore;
+	
+	public void beginTransaction(ConnectionContext context) throws IOException {
+		if( context.getLongTermStoreContext()!=null )
+			throw new IOException("Transation already started.");
+		
+		EntityManager manager = getEntityManagerFactory().createEntityManager();		
+		manager.getTransaction().begin();
+		context.setLongTermStoreContext(manager);
+	}
+
+	public void commitTransaction(ConnectionContext context) throws IOException {
+		EntityManager manager = (EntityManager) context.getLongTermStoreContext();
+		if( manager==null )
+			throw new IOException("Transation not started.");
+		context.setLongTermStoreContext(null);
+		manager.getTransaction().commit();
+		manager.close();
+	}
+	
+	public void rollbackTransaction(ConnectionContext context) throws IOException {
+		EntityManager manager = (EntityManager) context.getLongTermStoreContext();
+		if( manager==null )
+			throw new IOException("Transation not started.");
+		context.setLongTermStoreContext(null);
+		manager.getTransaction().rollback();
+		manager.close();
+	}
+	
+	public EntityManager beginEntityManager(ConnectionContext context) {
+		if( context==null || context.getLongTermStoreContext()==null ) {
+			EntityManager manager = getEntityManagerFactory().createEntityManager();		
+			manager.getTransaction().begin();
+			return manager;
+		} else {
+			return (EntityManager) context.getLongTermStoreContext();
+		}
+	}
+	
+	public void commitEntityManager(ConnectionContext context, EntityManager manager) {		
+		if( context==null || context.getLongTermStoreContext()==null ) {
+			manager.getTransaction().commit();
+			manager.close();
+		} 
+	}
+	
+	public void rollbackEntityManager(ConnectionContext context, EntityManager manager) {		
+		if( context==null || context.getLongTermStoreContext()==null ) {
+			manager.getTransaction().rollback();
+			manager.close();
+		} 
+	}
+
+	public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+		MessageStore rc =  new JPAMessageStore(this, destination);
+        if (transactionStore != null) {
+            rc = transactionStore.proxy(rc);
+        }
+        return rc;
+	}
+
+	public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+		TopicMessageStore rc = new JPATopicMessageStore(this, destination);
+        if (transactionStore != null) {
+            rc = transactionStore.proxy(rc);
+        }
+        return rc;
+	}
+
+	public TransactionStore createTransactionStore() throws IOException {
+        if (transactionStore == null) {
+            transactionStore = new MemoryTransactionStore();
+        }
+        return this.transactionStore;
+	}
+
+	public void deleteAllMessages() throws IOException {
+		EntityManager manager = beginEntityManager(null);
+		try {
+			Query query = manager.createQuery("delete from StoredMessage m");
+			query.executeUpdate();
+			query = manager.createQuery("delete from StoredSubscription ss");
+			query.executeUpdate();
+		} catch (Throwable e) {
+			rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		commitEntityManager(null,manager);		
+	}
+
+	public Set getDestinations() {
+		HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
+		
+		EntityManager manager = beginEntityManager(null);
+		try {
+			Query query = manager.createQuery("select distinct m.destination from StoredMessage m");
+			for (String dest : (List<String>)query.getResultList()) {
+				rc.add(ActiveMQDestination.createDestination(dest,ActiveMQDestination.QUEUE_TYPE));
+	        }
+		} catch (RuntimeException e) {
+			rollbackEntityManager(null,manager);
+			throw e;
+		}
+		commitEntityManager(null,manager);		
+		return rc;
+	}
+
+	public long getLastMessageBrokerSequenceId() throws IOException {
+		long rc=0;
+		EntityManager manager = beginEntityManager(null);
+		try {
+			Query query = manager.createQuery("select max(m.id) from StoredMessage m");
+			Long t = (Long) query.getSingleResult();
+			if( t != null ) {
+				rc = t;
+			}
+		} catch (Throwable e) {
+			rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		commitEntityManager(null,manager);		
+		return rc;
+	}
+
+	public boolean isUseExternalMessageReferences() {
+		return false;
+	}
+
+	public void setUsageManager(UsageManager usageManager) {
+	}
+
+	public void setUseExternalMessageReferences(boolean enable) {
+		if( enable ) {
+			throw new IllegalArgumentException("This persistence adapter does not support externa message references");
+		}
+	}
+
+	public void start() throws Exception {
+	}
+
+	public void stop() throws Exception {
+		if( entityManagerFactory !=null ) {
+			entityManagerFactory.close();
+		}
+	}
+
+	public EntityManagerFactory getEntityManagerFactory() {
+		if( entityManagerFactory == null ) {
+			entityManagerFactory = createEntityManagerFactory();
+		}
+		return entityManagerFactory;
+	}
+	protected EntityManagerFactory createEntityManagerFactory() {
+		return Persistence.createEntityManagerFactory(getEntityManagerName(), getEntityManagerProperties());
+	}
+
+	public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
+		this.entityManagerFactory = entityManagerFactory;
+	}
+
+	public Properties getEntityManagerProperties() {
+		return entityManagerProperties;
+	}
+	public void setEntityManagerProperties(
+			Properties entityManagerProperties) {
+		this.entityManagerProperties = entityManagerProperties;
+	}
+
+	public String getEntityManagerName() {
+		return entityManagerName;
+	}
+	public void setEntityManagerName(String entityManager) {
+		this.entityManagerName = entityManager;
+	}
+
+	public WireFormat getWireFormat() {
+		if(wireFormat==null) {
+			wireFormat = createWireFormat();
+		}
+		return wireFormat;
+	}
+
+	private WireFormat createWireFormat() {
+		OpenWireFormatFactory wff = new OpenWireFormatFactory();
+		return wff.createWireFormat(); 
+	}
+
+	public void setWireFormat(WireFormat wireFormat) {
+		this.wireFormat = wireFormat;
+	}
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java Mon Nov 20 10:13:50 2006
@@ -0,0 +1,233 @@
+package org.apache.activemq.store.jpa;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.jpa.model.StoredMessage;
+import org.apache.activemq.store.jpa.model.StoredSubscription;
+import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+
+public class JPATopicMessageStore extends JPAMessageStore implements TopicMessageStore {
+    private Map<SubscriptionId,AtomicLong> subscriberLastMessageMap=new ConcurrentHashMap<SubscriptionId,AtomicLong>();
+
+	public JPATopicMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
+		super(adapter, destination);
+	}
+
+	public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+		EntityManager manager = adapter.beginEntityManager(context);
+		try {
+			StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+			ss.setLastAckedId(messageId.getBrokerSequenceId());
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(context,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(context,manager);
+	}
+
+	public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {			
+			StoredSubscription ss = new StoredSubscription();
+			ss.setClientId(clientId);
+			ss.setSubscriptionName(subscriptionName);
+			ss.setDestination(destinationName);
+			ss.setSelector(selector);
+			ss.setLastAckedId(-1);
+			
+			if( !retroactive ) {
+				Query query = manager.createQuery("select max(m.id) from StoredMessage m");
+				Long rc = (Long) query.getSingleResult();
+				if( rc != null ) {
+					ss.setLastAckedId(rc);
+				}
+			}
+			
+			manager.persist(ss);
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+	}
+
+	public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {			
+			StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+			manager.remove(ss);
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+	}
+
+	private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) {
+		Query query = manager.createQuery(
+				"select ss from StoredSubscription ss " +
+				"where ss.clientId=?1 " +
+				"and ss.subscriptionName=?2 " +
+				"and ss.destination=?3");
+		query.setParameter(1, clientId);
+		query.setParameter(2, subscriptionName);
+		query.setParameter(3, destinationName);
+		List<StoredSubscription> resultList = query.getResultList();
+		if( resultList.isEmpty() )
+			return null;
+		return resultList.get(0); 
+	}
+
+	public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+		SubscriptionInfo rc[];
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+			ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>();
+			
+			Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
+			query.setParameter(1, destinationName);
+			for (StoredSubscription ss : (List<StoredSubscription>)query.getResultList()) {
+				SubscriptionInfo info = new SubscriptionInfo();
+				info.setClientId(ss.getClientId());
+				info.setDestination(destination);
+				info.setSelector(ss.getSelector());
+				info.setSubcriptionName(ss.getSubscriptionName());
+				l.add(info);
+	        }
+			
+			rc = new SubscriptionInfo[l.size()];
+			l.toArray(rc);
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);		
+		return rc;
+	}
+
+	public int getMessageCount(String clientId, String subscriptionName) throws IOException {
+		Integer rc;
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {	
+			Query query = manager.createQuery(
+					"select count(m) FROM StoredMessage m, StoredSubscription ss " +
+					"where ss.clientId=?1 " +
+					"and   ss.subscriptionName=?2 " +
+					"and   ss.destination=?3 " +
+					"and   m.desination=ss.destination and m.id > ss.lastAckedId");
+			query.setParameter(1, clientId);
+			query.setParameter(2, subscriptionName);
+			query.setParameter(2, destinationName);
+	        rc = (Integer) query.getSingleResult();	        
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+		return rc;
+	}
+
+	public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+		SubscriptionInfo rc=null;
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {			
+			StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+			if( ss != null ) {
+				rc = new SubscriptionInfo();
+				rc.setClientId(ss.getClientId());
+				rc.setDestination(destination);
+				rc.setSelector(ss.getSelector());
+				rc.setSubcriptionName(ss.getSubscriptionName());
+			}
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+		return rc;
+	}
+
+	public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+			SubscriptionId id = new SubscriptionId();
+			id.setClientId(clientId);
+			id.setSubscriptionName(subscriptionName);
+			id.setDestination(destinationName);
+	
+			AtomicLong last=subscriberLastMessageMap.get(id);
+	        if(last==null){
+	    		StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+	            last=new AtomicLong(ss.getLastAckedId());
+	            subscriberLastMessageMap.put(id,last);
+	        }
+	        final AtomicLong lastMessageId=last;
+			
+			Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
+			query.setParameter(1, destinationName);
+			query.setParameter(2, lastMessageId.get());
+			query.setMaxResults(maxReturned);
+			int count = 0;
+			for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
+				Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
+				listener.recoverMessage(message);
+				lastMessageId.set(m.getId());
+				count++;
+				if( count >= maxReturned ) { 
+					return;
+				}
+	        }        
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+	}
+
+	public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
+		EntityManager manager = adapter.beginEntityManager(null);
+		try {
+	
+			StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+			
+			Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
+			query.setParameter(1, destinationName);
+			query.setParameter(2, ss.getLastAckedId());
+			for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
+				Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
+				listener.recoverMessage(message);
+	        }
+		} catch (Throwable e) {
+			adapter.rollbackEntityManager(null,manager);
+			throw IOExceptionSupport.create(e);
+		}
+		adapter.commitEntityManager(null,manager);
+	}
+
+	public void resetBatching(String clientId, String subscriptionName) {
+		SubscriptionId id = new SubscriptionId();
+		id.setClientId(clientId);
+		id.setSubscriptionName(subscriptionName);
+		id.setDestination(destinationName);
+
+        subscriberLastMessageMap.remove(id);
+	}
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java Mon Nov 20 10:13:50 2006
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa.model;
+
+import javax.persistence.Basic;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+
+/** 
+ */
+@Entity
+public class StoredMessage {
+	
+    @Id
+    private long id;
+	
+    @Basic
+    private String messageId;
+
+    @Basic
+    private String destination;
+
+    @Basic
+    private long exiration;
+
+    @Basic
+    private byte[] data;
+
+    public StoredMessage() {
+    }
+
+	public byte[] getData() {
+		return data;
+	}
+
+	public void setData(byte[] data) {
+		this.data = data;
+	}
+
+	public String getDestination() {
+		return destination;
+	}
+
+	public void setDestination(String destination) {
+		this.destination = destination;
+	}
+
+	public long getExiration() {
+		return exiration;
+	}
+
+	public void setExiration(long exiration) {
+		this.exiration = exiration;
+	}
+
+	public String getMessageId() {
+		return messageId;
+	}
+
+	public void setMessageId(String messageId) {
+		this.messageId = messageId;
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public void setId(long sequenceId) {
+		this.id = sequenceId;
+	}
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java Mon Nov 20 10:13:50 2006
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa.model;
+
+import javax.persistence.Basic;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+
+/** 
+ */
+@Entity
+public class StoredSubscription {
+		
+	/**
+     * Application identity class for Magazine.
+     */
+    public static class SubscriptionId {
+
+	    public String destination;
+	    public String clientId;
+	    public String subscriptionName;
+
+        public boolean equals(Object other) {
+            if (other == this)
+                return true;
+            if (!(other instanceof SubscriptionId))
+                return false;
+    
+            SubscriptionId sid = (SubscriptionId) other;
+            return (destination == sid.destination || (destination != null && destination.equals(sid.destination)))
+                && (clientId == sid.clientId || (clientId != null && clientId.equals(sid.clientId)))
+                && (subscriptionName == sid.subscriptionName || (subscriptionName != null && subscriptionName.equals(sid.subscriptionName)));
+        }
+     
+        /**
+         * Hashcode must also depend on identity values.
+         */
+        public int hashCode() {
+            return ((destination == null) ? 0 : destination.hashCode())
+                ^ ((clientId == null) ? 0 : clientId.hashCode())
+                ^ ((subscriptionName == null) ? 0 : subscriptionName.hashCode())
+                ;
+        } 
+
+        public String toString() {
+            return destination + ":" + clientId + ":" + subscriptionName;
+        }
+
+		public String getClientId() {
+			return clientId;
+		}
+
+		public void setClientId(String clientId) {
+			this.clientId = clientId;
+		}
+
+		public String getDestination() {
+			return destination;
+		}
+
+		public void setDestination(String destination) {
+			this.destination = destination;
+		}
+
+		public String getSubscriptionName() {
+			return subscriptionName;
+		}
+
+		public void setSubscriptionName(String subscriptionName) {
+			this.subscriptionName = subscriptionName;
+		}
+    }
+
+    @Id
+    @GeneratedValue(strategy=GenerationType.AUTO) 
+    private long id;
+    
+    @Basic
+    private String destination;
+    @Basic
+    private String clientId;
+    @Basic
+    private String subscriptionName;
+    
+    @Basic
+    private long lastAckedId;
+    @Basic
+    private String selector;
+
+
+	public long getLastAckedId() {
+		return lastAckedId;
+	}
+
+	public void setLastAckedId(long lastAckedId) {
+		this.lastAckedId = lastAckedId;
+	}
+
+	public String getSelector() {
+		return selector;
+	}
+
+	public void setSelector(String selector) {
+		this.selector = selector;
+	}
+
+	public String getDestination() {
+		return destination;
+	}
+
+	public void setDestination(String destination) {
+		this.destination = destination;
+	}
+
+	public String getClientId() {
+		return clientId;
+	}
+
+	public void setClientId(String clientId) {
+		this.clientId = clientId;
+	}
+
+	public String getSubscriptionName() {
+		return subscriptionName;
+	}
+
+	public void setSubscriptionName(String subscriptionName) {
+		this.subscriptionName = subscriptionName;
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public void setId(long id) {
+		this.id = id;
+	}
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java?view=diff&rev=477273&r1=477272&r2=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java Mon Nov 20 10:13:50 2006
@@ -54,5 +54,14 @@
 	public void setOffset(int offset) {
 		this.offset = offset;
 	}
+	
+	public void compact() {
+		if( length != data.length ) {
+			byte t[] = new byte[length];
+			System.arraycopy(data, offset, t, 0, length);
+			data=t;
+			offset=0;
+		}
+	}
 
 }

Added: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml (added)
+++ incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml Mon Nov 20 10:13:50 2006
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright 2006 The Apache Software Foundation.
+ 
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ 
+ http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<persistence xmlns="http://java.sun.com/xml/ns/persistence"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    version="1.0">
+    <persistence-unit name="activemq" transaction-type="RESOURCE_LOCAL">
+        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+        <class>org.apache.activemq.store.jpa.model.StoredMessage</class>
+        <class>org.apache.activemq.store.jpa.model.StoredSubscription</class>
+        <!-- 
+        <class>org.apache.activemq.store.jpa.model.StoredSubscription$SubscriptionId</class>
+         -->
+    </persistence-unit>
+</persistence>

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java Mon Nov 20 10:13:50 2006
@@ -0,0 +1,71 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.util.Properties;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.RecoveryBrokerTest;
+import org.apache.activemq.store.jpa.JPAPersistenceAdapter;
+
+/**
+ * Used to verify that recovery works correctly against 
+ * 
+ * @version $Revision$
+ */
+public class JPARecoveryBrokerTest extends RecoveryBrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setDeleteAllMessagesOnStartup(true);
+        JPAPersistenceAdapter pa = new JPAPersistenceAdapter();
+        Properties props = new Properties();
+        props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
+        props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
+        props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
+        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+        pa.setEntityManagerProperties(props);
+        service.setPersistenceAdapter(pa);
+        return service;
+        
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        JPAPersistenceAdapter pa = new JPAPersistenceAdapter();
+        Properties props = new Properties();
+        props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
+        props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
+        props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
+        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+        pa.setEntityManagerProperties(props);
+        service.setPersistenceAdapter(pa);
+        return service;
+    }
+    
+    public static Test suite() {
+        return suite(JPARecoveryBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Modified: incubator/activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/pom.xml?view=diff&rev=477273&r1=477272&r2=477273
==============================================================================
--- incubator/activemq/trunk/pom.xml (original)
+++ incubator/activemq/trunk/pom.xml Mon Nov 20 10:13:50 2006
@@ -314,6 +314,12 @@
         <version>${commons-collections-version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.openjpa</groupId>
+        <artifactId>openjpa-persistence-jdbc</artifactId>
+        <version>${openjpa-version}</version>
+      </dependency>    
+
       <!-- Optional Spring Support -->
       <dependency>
         <groupId>org.springframework</groupId>
@@ -868,8 +874,9 @@
     <axis-version>1.2-RC1</axis-version>
     <cglib-version>2.0</cglib-version>
     <commons-beanutils-version>1.6.1</commons-beanutils-version>
-    <commons-collections-version>2.1</commons-collections-version>
-    <commons-dbcp-version>1.2</commons-dbcp-version>
+    <commons-collections-version>3.1</commons-collections-version>
+    <openjpa-version>0.9.6-incubating</openjpa-version>
+    <commons-dbcp-version>1.2.1</commons-dbcp-version>
     <commons-httpclient-version>2.0.1</commons-httpclient-version>
     <commons-logging-version>1.1</commons-logging-version>
     <commons-pool-version>1.2</commons-pool-version>
@@ -887,7 +894,7 @@
     <junit-version>3.8.1</junit-version>
     <jxta-version>2.0</jxta-version>
     <log4j-version>1.2.12</log4j-version>
-    <org-apache-derby-version>10.1.1.0</org-apache-derby-version>
+    <org-apache-derby-version>10.1.3.1</org-apache-derby-version>
     <org-apache-geronimo-specs-version>1.0</org-apache-geronimo-specs-version>
     <org-apache-maven-surefire-plugin-version>2.2</org-apache-maven-surefire-plugin-version>
     <p2psockets-version>1.1.2</p2psockets-version>