You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/11/20 19:13:51 UTC
svn commit: r477273 - in /incubator/activemq/trunk: ./ activemq-core/
activemq-core/src/main/java/org/apache/activemq/store/jpa/
activemq-core/src/main/java/org/apache/activemq/store/jpa/model/
activemq-core/src/main/java/org/apache/activemq/util/ acti...
Author: chirino
Date: Mon Nov 20 10:13:50 2006
New Revision: 477273
URL: http://svn.apache.org/viewvc?view=rev&rev=477273
Log:
Added the initial cut of a JPA based message store.
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
Modified:
incubator/activemq/trunk/activemq-core/pom.xml
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java
incubator/activemq/trunk/pom.xml
Modified: incubator/activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/pom.xml?view=diff&rev=477273&r1=477272&r2=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/pom.xml (original)
+++ incubator/activemq/trunk/activemq-core/pom.xml Mon Nov 20 10:13:50 2006
@@ -156,6 +156,10 @@
<version>1.2.24</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-persistence-jdbc</artifactId>
+ </dependency>
</dependencies>
<build>
@@ -270,6 +274,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
+ <!--
<configuration>
<tasks>
<taskdef name="generate" classname="org.apache.activemq.openwire.tool.JavaGeneratorTask"/>
@@ -283,33 +288,38 @@
<version>${activemq-version}</version>
</dependency>
</dependencies>
+ -->
+
+ <executions>
+ <execution>
+ <phase>process-classes</phase>
+ <configuration>
+ <tasks>
+ <path id="cp">
+ <path refid="maven.test.classpath"/>
+ <path refid="maven.compile.classpath"/>
+ <path refid="maven.dependency.classpath"/>
+ </path>
+ <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask">
+ <classpath refid="cp"/>
+ </taskdef>
+ <openjpac directory="${basedir}/target/jpa-classes">
+ <classpath refid="cp"/>
+ <fileset dir="${basedir}/target/classes">
+ <include name="org/apache/activemq/store/jpa/model/*.class"/>
+ </fileset>
+ </openjpac>
+ <copy todir="${basedir}/target/classes">
+ <fileset dir="${basedir}/target/jpa-classes"/>
+ </copy>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
</plugin>
-
- <!-- Use Gram to Gernerate the OpenWire Marshallers -->
- <!--
- <plugin>
- <groupId>org.apache.activemq</groupId>
- <artifactId>maven-gram-plugin</artifactId>
- <version>4.1-incubator</version>
- <configuration>
- <scripts>
- :GenerateJavaMarshalling.groovy: GenerateJavaTests.groovy: GenerateCSharpMarshalling.groovy:
- GenerateCSharpClasses.groovy: GenerateCppMarshallingClasses.groovy: GenerateCppMarshallingHeaders.groovy:
- GenerateCppHeaders.groovy: GenerateCppClasses.groovy: GenerateCMarshalling.groovy:
- </scripts>
- <groovyProperties>
- <version>2</version>
- </groovyProperties>
- </configuration>
- <dependencies>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-openwire-generator</artifactId>
- <version>${activemq-version}</version>
- </dependency>
- </dependencies>
- </plugin>
- -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java Mon Nov 20 10:13:50 2006
@@ -0,0 +1,194 @@
+package org.apache.activemq.store.jpa;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.jpa.model.StoredMessage;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+public class JPAMessageStore implements MessageStore {
+
+ protected final JPAPersistenceAdapter adapter;
+ protected final WireFormat wireFormat;
+ protected final ActiveMQDestination destination;
+ protected final String destinationName;
+ protected AtomicLong lastMessageId = new AtomicLong(-1);
+
+ public JPAMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
+ this.adapter = adapter;
+ this.destination = destination;
+ this.destinationName = destination.getQualifiedName();
+ this.wireFormat = this.adapter.getWireFormat();
+ }
+
+ public void addMessage(ConnectionContext context, Message message) throws IOException {
+
+ EntityManager manager = adapter.beginEntityManager(context);
+ try {
+
+ ByteSequence sequence = wireFormat.marshal(message);
+ sequence.compact();
+
+ StoredMessage sm = new StoredMessage();
+ sm.setDestination(destinationName);
+ sm.setId(message.getMessageId().getBrokerSequenceId());
+ sm.setMessageId(message.getMessageId().toString());
+ sm.setExiration(message.getExpiration());
+ sm.setData(sequence.data);
+
+ manager.persist(sm);
+
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(context,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(context,manager);
+ }
+
+ public void addMessageReference(ConnectionContext context,
+ MessageId messageId, long expirationTime, String messageRef)
+ throws IOException {
+ throw new IOException("Not implemented.");
+ }
+
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public Message getMessage(MessageId identity) throws IOException {
+ Message rc;
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ StoredMessage message=null;
+ if( identity.getBrokerSequenceId()!= 0 ) {
+ message = manager.find(StoredMessage.class, identity.getBrokerSequenceId());
+ } else {
+ Query query = manager.createQuery("select m from StoredMessage m where m.messageId=?1");
+ query.setParameter(1, identity.toString());
+ message = (StoredMessage) query.getSingleResult();
+ }
+
+ rc = (Message) wireFormat.unmarshal(new ByteSequence(message.getData()));
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null,manager);
+ return rc;
+ }
+
+ public int getMessageCount() throws IOException {
+ Integer rc;
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ Query query = manager.createQuery("select count(m) from StoredMessage m");
+ rc = (Integer) query.getSingleResult();
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null,manager);
+ return rc;
+ }
+
+ public String getMessageReference(MessageId identity) throws IOException {
+ throw new IOException("Not implemented.");
+ }
+
+ public void recover(MessageRecoveryListener container) throws Exception {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 order by m.id asc");
+ query.setParameter(1, destinationName);
+ for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
+ Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
+ container.recoverMessage(message);
+ }
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null,manager);
+ }
+
+ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
+
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+
+ Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
+ query.setParameter(1, destinationName);
+ query.setParameter(2, lastMessageId.get());
+ query.setMaxResults(maxReturned);
+ int count = 0;
+ for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
+ Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
+ listener.recoverMessage(message);
+ lastMessageId.set(m.getId());
+ count++;
+ if( count >= maxReturned ) {
+ return;
+ }
+ }
+
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null,manager);
+ }
+
+ public void removeAllMessages(ConnectionContext context) throws IOException {
+ EntityManager manager = adapter.beginEntityManager(context);
+ try {
+ Query query = manager.createQuery("delete from StoredMessage m where m.destination=?1");
+ query.setParameter(1, destinationName);
+ query.executeUpdate();
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(context,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(context,manager);
+ }
+
+ public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
+ EntityManager manager = adapter.beginEntityManager(context);
+ try {
+ Query query = manager.createQuery("delete from StoredMessage m where m.id=?1");
+ query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId());
+ query.executeUpdate();
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(context,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(context,manager);
+ }
+
+ public void resetBatching() {
+ lastMessageId.set(-1);
+ }
+
+ public void setUsageManager(UsageManager usageManager) {
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java Mon Nov 20 10:13:50 2006
@@ -0,0 +1,253 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import javax.persistence.Query;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.memory.MemoryTransactionStore;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An implementation of {@link PersistenceAdapter} that uses JPA to
+ * store it's messages.
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: 1.17 $
+ */
+public class JPAPersistenceAdapter implements PersistenceAdapter {
+
+ private static final Log log = LogFactory.getLog(JPAPersistenceAdapter.class);
+ String entityManagerName = "activemq";
+ Properties entityManagerProperties = System.getProperties();
+ EntityManagerFactory entityManagerFactory;
+ private WireFormat wireFormat;
+ private MemoryTransactionStore transactionStore;
+
+ public void beginTransaction(ConnectionContext context) throws IOException {
+ if( context.getLongTermStoreContext()!=null )
+ throw new IOException("Transation already started.");
+
+ EntityManager manager = getEntityManagerFactory().createEntityManager();
+ manager.getTransaction().begin();
+ context.setLongTermStoreContext(manager);
+ }
+
+ public void commitTransaction(ConnectionContext context) throws IOException {
+ EntityManager manager = (EntityManager) context.getLongTermStoreContext();
+ if( manager==null )
+ throw new IOException("Transation not started.");
+ context.setLongTermStoreContext(null);
+ manager.getTransaction().commit();
+ manager.close();
+ }
+
+ public void rollbackTransaction(ConnectionContext context) throws IOException {
+ EntityManager manager = (EntityManager) context.getLongTermStoreContext();
+ if( manager==null )
+ throw new IOException("Transation not started.");
+ context.setLongTermStoreContext(null);
+ manager.getTransaction().rollback();
+ manager.close();
+ }
+
+ public EntityManager beginEntityManager(ConnectionContext context) {
+ if( context==null || context.getLongTermStoreContext()==null ) {
+ EntityManager manager = getEntityManagerFactory().createEntityManager();
+ manager.getTransaction().begin();
+ return manager;
+ } else {
+ return (EntityManager) context.getLongTermStoreContext();
+ }
+ }
+
+ public void commitEntityManager(ConnectionContext context, EntityManager manager) {
+ if( context==null || context.getLongTermStoreContext()==null ) {
+ manager.getTransaction().commit();
+ manager.close();
+ }
+ }
+
+ public void rollbackEntityManager(ConnectionContext context, EntityManager manager) {
+ if( context==null || context.getLongTermStoreContext()==null ) {
+ manager.getTransaction().rollback();
+ manager.close();
+ }
+ }
+
+ public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+ MessageStore rc = new JPAMessageStore(this, destination);
+ if (transactionStore != null) {
+ rc = transactionStore.proxy(rc);
+ }
+ return rc;
+ }
+
+ public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+ TopicMessageStore rc = new JPATopicMessageStore(this, destination);
+ if (transactionStore != null) {
+ rc = transactionStore.proxy(rc);
+ }
+ return rc;
+ }
+
+ public TransactionStore createTransactionStore() throws IOException {
+ if (transactionStore == null) {
+ transactionStore = new MemoryTransactionStore();
+ }
+ return this.transactionStore;
+ }
+
+ public void deleteAllMessages() throws IOException {
+ EntityManager manager = beginEntityManager(null);
+ try {
+ Query query = manager.createQuery("delete from StoredMessage m");
+ query.executeUpdate();
+ query = manager.createQuery("delete from StoredSubscription ss");
+ query.executeUpdate();
+ } catch (Throwable e) {
+ rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ commitEntityManager(null,manager);
+ }
+
+ public Set getDestinations() {
+ HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
+
+ EntityManager manager = beginEntityManager(null);
+ try {
+ Query query = manager.createQuery("select distinct m.destination from StoredMessage m");
+ for (String dest : (List<String>)query.getResultList()) {
+ rc.add(ActiveMQDestination.createDestination(dest,ActiveMQDestination.QUEUE_TYPE));
+ }
+ } catch (RuntimeException e) {
+ rollbackEntityManager(null,manager);
+ throw e;
+ }
+ commitEntityManager(null,manager);
+ return rc;
+ }
+
+ public long getLastMessageBrokerSequenceId() throws IOException {
+ long rc=0;
+ EntityManager manager = beginEntityManager(null);
+ try {
+ Query query = manager.createQuery("select max(m.id) from StoredMessage m");
+ Long t = (Long) query.getSingleResult();
+ if( t != null ) {
+ rc = t;
+ }
+ } catch (Throwable e) {
+ rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ commitEntityManager(null,manager);
+ return rc;
+ }
+
+ public boolean isUseExternalMessageReferences() {
+ return false;
+ }
+
+ public void setUsageManager(UsageManager usageManager) {
+ }
+
+ public void setUseExternalMessageReferences(boolean enable) {
+ if( enable ) {
+ throw new IllegalArgumentException("This persistence adapter does not support externa message references");
+ }
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ if( entityManagerFactory !=null ) {
+ entityManagerFactory.close();
+ }
+ }
+
+ public EntityManagerFactory getEntityManagerFactory() {
+ if( entityManagerFactory == null ) {
+ entityManagerFactory = createEntityManagerFactory();
+ }
+ return entityManagerFactory;
+ }
+ protected EntityManagerFactory createEntityManagerFactory() {
+ return Persistence.createEntityManagerFactory(getEntityManagerName(), getEntityManagerProperties());
+ }
+
+ public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
+ this.entityManagerFactory = entityManagerFactory;
+ }
+
+ public Properties getEntityManagerProperties() {
+ return entityManagerProperties;
+ }
+ public void setEntityManagerProperties(
+ Properties entityManagerProperties) {
+ this.entityManagerProperties = entityManagerProperties;
+ }
+
+ public String getEntityManagerName() {
+ return entityManagerName;
+ }
+ public void setEntityManagerName(String entityManager) {
+ this.entityManagerName = entityManager;
+ }
+
+ public WireFormat getWireFormat() {
+ if(wireFormat==null) {
+ wireFormat = createWireFormat();
+ }
+ return wireFormat;
+ }
+
+ private WireFormat createWireFormat() {
+ OpenWireFormatFactory wff = new OpenWireFormatFactory();
+ return wff.createWireFormat();
+ }
+
+ public void setWireFormat(WireFormat wireFormat) {
+ this.wireFormat = wireFormat;
+ }
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java Mon Nov 20 10:13:50 2006
@@ -0,0 +1,233 @@
+package org.apache.activemq.store.jpa;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.jpa.model.StoredMessage;
+import org.apache.activemq.store.jpa.model.StoredSubscription;
+import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+
+public class JPATopicMessageStore extends JPAMessageStore implements TopicMessageStore {
+ private Map<SubscriptionId,AtomicLong> subscriberLastMessageMap=new ConcurrentHashMap<SubscriptionId,AtomicLong>();
+
+ public JPATopicMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
+ super(adapter, destination);
+ }
+
+ public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+ EntityManager manager = adapter.beginEntityManager(context);
+ try {
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+ ss.setLastAckedId(messageId.getBrokerSequenceId());
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(context,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(context,manager);
+ }
+
+ public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ StoredSubscription ss = new StoredSubscription();
+ ss.setClientId(clientId);
+ ss.setSubscriptionName(subscriptionName);
+ ss.setDestination(destinationName);
+ ss.setSelector(selector);
+ ss.setLastAckedId(-1);
+
+ if( !retroactive ) {
+ Query query = manager.createQuery("select max(m.id) from StoredMessage m");
+ Long rc = (Long) query.getSingleResult();
+ if( rc != null ) {
+ ss.setLastAckedId(rc);
+ }
+ }
+
+ manager.persist(ss);
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null,manager);
+ }
+
+ public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+ manager.remove(ss);
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null,manager);
+ }
+
+ private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) {
+ Query query = manager.createQuery(
+ "select ss from StoredSubscription ss " +
+ "where ss.clientId=?1 " +
+ "and ss.subscriptionName=?2 " +
+ "and ss.destination=?3");
+ query.setParameter(1, clientId);
+ query.setParameter(2, subscriptionName);
+ query.setParameter(3, destinationName);
+ List<StoredSubscription> resultList = query.getResultList();
+ if( resultList.isEmpty() )
+ return null;
+ return resultList.get(0);
+ }
+
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+ SubscriptionInfo rc[];
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>();
+
+ Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
+ query.setParameter(1, destinationName);
+ for (StoredSubscription ss : (List<StoredSubscription>)query.getResultList()) {
+ SubscriptionInfo info = new SubscriptionInfo();
+ info.setClientId(ss.getClientId());
+ info.setDestination(destination);
+ info.setSelector(ss.getSelector());
+ info.setSubcriptionName(ss.getSubscriptionName());
+ l.add(info);
+ }
+
+ rc = new SubscriptionInfo[l.size()];
+ l.toArray(rc);
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null,manager);
+ return rc;
+ }
+
+ public int getMessageCount(String clientId, String subscriptionName) throws IOException {
+ Integer rc;
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ Query query = manager.createQuery(
+ "select count(m) FROM StoredMessage m, StoredSubscription ss " +
+ "where ss.clientId=?1 " +
+ "and ss.subscriptionName=?2 " +
+ "and ss.destination=?3 " +
+ "and m.desination=ss.destination and m.id > ss.lastAckedId");
+ query.setParameter(1, clientId);
+ query.setParameter(2, subscriptionName);
+ query.setParameter(2, destinationName);
+ rc = (Integer) query.getSingleResult();
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null,manager);
+ return rc;
+ }
+
+ public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+ SubscriptionInfo rc=null;
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+ if( ss != null ) {
+ rc = new SubscriptionInfo();
+ rc.setClientId(ss.getClientId());
+ rc.setDestination(destination);
+ rc.setSelector(ss.getSelector());
+ rc.setSubcriptionName(ss.getSubscriptionName());
+ }
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null,manager);
+ return rc;
+ }
+
+ public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ SubscriptionId id = new SubscriptionId();
+ id.setClientId(clientId);
+ id.setSubscriptionName(subscriptionName);
+ id.setDestination(destinationName);
+
+ AtomicLong last=subscriberLastMessageMap.get(id);
+ if(last==null){
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+ last=new AtomicLong(ss.getLastAckedId());
+ subscriberLastMessageMap.put(id,last);
+ }
+ final AtomicLong lastMessageId=last;
+
+ Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
+ query.setParameter(1, destinationName);
+ query.setParameter(2, lastMessageId.get());
+ query.setMaxResults(maxReturned);
+ int count = 0;
+ for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
+ Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
+ listener.recoverMessage(message);
+ lastMessageId.set(m.getId());
+ count++;
+ if( count >= maxReturned ) {
+ return;
+ }
+ }
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null,manager);
+ }
+
+ public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+
+ Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
+ query.setParameter(1, destinationName);
+ query.setParameter(2, ss.getLastAckedId());
+ for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
+ Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
+ listener.recoverMessage(message);
+ }
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null,manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null,manager);
+ }
+
+ public void resetBatching(String clientId, String subscriptionName) {
+ SubscriptionId id = new SubscriptionId();
+ id.setClientId(clientId);
+ id.setSubscriptionName(subscriptionName);
+ id.setDestination(destinationName);
+
+ subscriberLastMessageMap.remove(id);
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java Mon Nov 20 10:13:50 2006
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa.model;
+
+import javax.persistence.Basic;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+
+/**
+ */
+@Entity
+public class StoredMessage {
+
+ @Id
+ private long id;
+
+ @Basic
+ private String messageId;
+
+ @Basic
+ private String destination;
+
+ @Basic
+ private long exiration;
+
+ @Basic
+ private byte[] data;
+
+ public StoredMessage() {
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ public String getDestination() {
+ return destination;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public long getExiration() {
+ return exiration;
+ }
+
+ public void setExiration(long exiration) {
+ this.exiration = exiration;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(String messageId) {
+ this.messageId = messageId;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long sequenceId) {
+ this.id = sequenceId;
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java Mon Nov 20 10:13:50 2006
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa.model;
+
+import javax.persistence.Basic;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+
+/**
+ */
+@Entity
+public class StoredSubscription {
+
+ /**
+ * Application identity class for Magazine.
+ */
+ public static class SubscriptionId {
+
+ public String destination;
+ public String clientId;
+ public String subscriptionName;
+
+ public boolean equals(Object other) {
+ if (other == this)
+ return true;
+ if (!(other instanceof SubscriptionId))
+ return false;
+
+ SubscriptionId sid = (SubscriptionId) other;
+ return (destination == sid.destination || (destination != null && destination.equals(sid.destination)))
+ && (clientId == sid.clientId || (clientId != null && clientId.equals(sid.clientId)))
+ && (subscriptionName == sid.subscriptionName || (subscriptionName != null && subscriptionName.equals(sid.subscriptionName)));
+ }
+
+ /**
+ * Hashcode must also depend on identity values.
+ */
+ public int hashCode() {
+ return ((destination == null) ? 0 : destination.hashCode())
+ ^ ((clientId == null) ? 0 : clientId.hashCode())
+ ^ ((subscriptionName == null) ? 0 : subscriptionName.hashCode())
+ ;
+ }
+
+ public String toString() {
+ return destination + ":" + clientId + ":" + subscriptionName;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getDestination() {
+ return destination;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
+
+ public void setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ }
+ }
+
+ @Id
+ @GeneratedValue(strategy=GenerationType.AUTO)
+ private long id;
+
+ @Basic
+ private String destination;
+ @Basic
+ private String clientId;
+ @Basic
+ private String subscriptionName;
+
+ @Basic
+ private long lastAckedId;
+ @Basic
+ private String selector;
+
+
+ public long getLastAckedId() {
+ return lastAckedId;
+ }
+
+ public void setLastAckedId(long lastAckedId) {
+ this.lastAckedId = lastAckedId;
+ }
+
+ public String getSelector() {
+ return selector;
+ }
+
+ public void setSelector(String selector) {
+ this.selector = selector;
+ }
+
+ public String getDestination() {
+ return destination;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
+
+ public void setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java?view=diff&rev=477273&r1=477272&r2=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java Mon Nov 20 10:13:50 2006
@@ -54,5 +54,14 @@
public void setOffset(int offset) {
this.offset = offset;
}
+
+ public void compact() {
+ if( length != data.length ) {
+ byte t[] = new byte[length];
+ System.arraycopy(data, offset, t, 0, length);
+ data=t;
+ offset=0;
+ }
+ }
}
Added: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml (added)
+++ incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml Mon Nov 20 10:13:50 2006
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright 2006 The Apache Software Foundation.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<persistence xmlns="http://java.sun.com/xml/ns/persistence"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ version="1.0">
+ <persistence-unit name="activemq" transaction-type="RESOURCE_LOCAL">
+ <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+ <class>org.apache.activemq.store.jpa.model.StoredMessage</class>
+ <class>org.apache.activemq.store.jpa.model.StoredSubscription</class>
+ <!--
+ <class>org.apache.activemq.store.jpa.model.StoredSubscription$SubscriptionId</class>
+ -->
+ </persistence-unit>
+</persistence>
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java?view=auto&rev=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java Mon Nov 20 10:13:50 2006
@@ -0,0 +1,71 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.util.Properties;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.RecoveryBrokerTest;
+import org.apache.activemq.store.jpa.JPAPersistenceAdapter;
+
+/**
+ * Used to verify that recovery works correctly against
+ *
+ * @version $Revision$
+ */
+public class JPARecoveryBrokerTest extends RecoveryBrokerTest {
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService service = new BrokerService();
+ service.setDeleteAllMessagesOnStartup(true);
+ JPAPersistenceAdapter pa = new JPAPersistenceAdapter();
+ Properties props = new Properties();
+ props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
+ props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
+ props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
+ props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+ pa.setEntityManagerProperties(props);
+ service.setPersistenceAdapter(pa);
+ return service;
+
+ }
+
+ protected BrokerService createRestartedBroker() throws Exception {
+ BrokerService service = new BrokerService();
+ JPAPersistenceAdapter pa = new JPAPersistenceAdapter();
+ Properties props = new Properties();
+ props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
+ props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
+ props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
+ props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+ pa.setEntityManagerProperties(props);
+ service.setPersistenceAdapter(pa);
+ return service;
+ }
+
+ public static Test suite() {
+ return suite(JPARecoveryBrokerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+}
Modified: incubator/activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/pom.xml?view=diff&rev=477273&r1=477272&r2=477273
==============================================================================
--- incubator/activemq/trunk/pom.xml (original)
+++ incubator/activemq/trunk/pom.xml Mon Nov 20 10:13:50 2006
@@ -314,6 +314,12 @@
<version>${commons-collections-version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-persistence-jdbc</artifactId>
+ <version>${openjpa-version}</version>
+ </dependency>
+
<!-- Optional Spring Support -->
<dependency>
<groupId>org.springframework</groupId>
@@ -868,8 +874,9 @@
<axis-version>1.2-RC1</axis-version>
<cglib-version>2.0</cglib-version>
<commons-beanutils-version>1.6.1</commons-beanutils-version>
- <commons-collections-version>2.1</commons-collections-version>
- <commons-dbcp-version>1.2</commons-dbcp-version>
+ <commons-collections-version>3.1</commons-collections-version>
+ <openjpa-version>0.9.6-incubating</openjpa-version>
+ <commons-dbcp-version>1.2.1</commons-dbcp-version>
<commons-httpclient-version>2.0.1</commons-httpclient-version>
<commons-logging-version>1.1</commons-logging-version>
<commons-pool-version>1.2</commons-pool-version>
@@ -887,7 +894,7 @@
<junit-version>3.8.1</junit-version>
<jxta-version>2.0</jxta-version>
<log4j-version>1.2.12</log4j-version>
- <org-apache-derby-version>10.1.1.0</org-apache-derby-version>
+ <org-apache-derby-version>10.1.3.1</org-apache-derby-version>
<org-apache-geronimo-specs-version>1.0</org-apache-geronimo-specs-version>
<org-apache-maven-surefire-plugin-version>2.2</org-apache-maven-surefire-plugin-version>
<p2psockets-version>1.1.2</p2psockets-version>