You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/12 07:27:34 UTC
svn commit: r565003 [3/17] - in /activemq/trunk:
activemq-fileserver/src/main/java/org/apache/activemq/util/
activemq-fileserver/src/test/java/org/apache/activemq/util/
activemq-jaas/src/main/java/org/apache/activemq/jaas/
activemq-jaas/src/test/java/o...
Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java Sat Aug 11 22:27:21 2007
@@ -40,219 +40,213 @@
import org.apache.activemq.util.IOExceptionSupport;
public class JPATopicMessageStore extends JPAMessageStore implements TopicMessageStore {
- private Map<SubscriptionId,AtomicLong> subscriberLastMessageMap=new ConcurrentHashMap<SubscriptionId,AtomicLong>();
+ private Map<SubscriptionId, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<SubscriptionId, AtomicLong>();
- public JPATopicMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
- super(adapter, destination);
- }
-
- public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
- EntityManager manager = adapter.beginEntityManager(context);
- try {
- StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
- ss.setLastAckedId(messageId.getBrokerSequenceId());
- } catch (Throwable e) {
- adapter.rollbackEntityManager(context,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(context,manager);
- }
-
- public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
- EntityManager manager = adapter.beginEntityManager(null);
- try {
- StoredSubscription ss = new StoredSubscription();
- ss.setClientId(info.getClientId());
- ss.setSubscriptionName(info.getSubscriptionName());
- ss.setDestination(destinationName);
- ss.setSelector(info.getSelector());
- ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
- ss.setLastAckedId(-1);
-
- if( !retroactive ) {
- Query query = manager.createQuery("select max(m.id) from StoredMessage m");
- Long rc = (Long) query.getSingleResult();
- if( rc != null ) {
- ss.setLastAckedId(rc);
- }
- }
-
- manager.persist(ss);
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- }
-
- public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
- EntityManager manager = adapter.beginEntityManager(null);
- try {
- StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
- manager.remove(ss);
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- }
-
- private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) {
- Query query = manager.createQuery(
- "select ss from StoredSubscription ss " +
- "where ss.clientId=?1 " +
- "and ss.subscriptionName=?2 " +
- "and ss.destination=?3");
- query.setParameter(1, clientId);
- query.setParameter(2, subscriptionName);
- query.setParameter(3, destinationName);
- List<StoredSubscription> resultList = query.getResultList();
- if( resultList.isEmpty() )
- return null;
- return resultList.get(0);
- }
-
- public SubscriptionInfo[] getAllSubscriptions() throws IOException {
- SubscriptionInfo rc[];
- EntityManager manager = adapter.beginEntityManager(null);
- try {
- ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>();
-
- Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
- query.setParameter(1, destinationName);
- for (StoredSubscription ss : (List<StoredSubscription>)query.getResultList()) {
- SubscriptionInfo info = new SubscriptionInfo();
- info.setClientId(ss.getClientId());
- info.setDestination(destination);
- info.setSelector(ss.getSelector());
- info.setSubscriptionName(ss.getSubscriptionName());
- info.setSubscribedDestination(toSubscribedDestination(ss));
- l.add(info);
- }
-
- rc = new SubscriptionInfo[l.size()];
- l.toArray(rc);
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- return rc;
- }
-
- public int getMessageCount(String clientId, String subscriptionName) throws IOException {
- Long rc;
- EntityManager manager = adapter.beginEntityManager(null);
- try {
- Query query = manager.createQuery(
- "select count(m) FROM StoredMessage m, StoredSubscription ss " +
- "where ss.clientId=?1 " +
- "and ss.subscriptionName=?2 " +
- "and ss.destination=?3 " +
- "and m.destination=ss.destination and m.id > ss.lastAckedId");
- query.setParameter(1, clientId);
- query.setParameter(2, subscriptionName);
- query.setParameter(3, destinationName);
- rc = (Long) query.getSingleResult();
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- return rc.intValue();
- }
-
- public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
- SubscriptionInfo rc=null;
- EntityManager manager = adapter.beginEntityManager(null);
- try {
- StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
- if( ss != null ) {
- rc = new SubscriptionInfo();
- rc.setClientId(ss.getClientId());
- rc.setDestination(destination);
- rc.setSelector(ss.getSelector());
- rc.setSubscriptionName(ss.getSubscriptionName());
- rc.setSubscribedDestination(toSubscribedDestination(ss));
- }
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- return rc;
- }
-
- private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
- if( ss.getSubscribedDestination() == null )
- return null;
- return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE);
- }
-
- public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
- EntityManager manager = adapter.beginEntityManager(null);
- try {
- SubscriptionId id = new SubscriptionId();
- id.setClientId(clientId);
- id.setSubscriptionName(subscriptionName);
- id.setDestination(destinationName);
-
- AtomicLong last=subscriberLastMessageMap.get(id);
- if(last==null){
- StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
- last=new AtomicLong(ss.getLastAckedId());
- subscriberLastMessageMap.put(id,last);
- }
- final AtomicLong lastMessageId=last;
-
- Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
- query.setParameter(1, destinationName);
- query.setParameter(2, lastMessageId.get());
- query.setMaxResults(maxReturned);
- int count = 0;
- for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
- Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
- listener.recoverMessage(message);
- lastMessageId.set(m.getId());
- count++;
- if( count >= maxReturned ) {
- return;
- }
- }
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- }
-
- public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
- EntityManager manager = adapter.beginEntityManager(null);
- try {
-
- StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
-
- Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
- query.setParameter(1, destinationName);
- query.setParameter(2, ss.getLastAckedId());
- for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
- Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
- listener.recoverMessage(message);
- }
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- }
-
- public void resetBatching(String clientId, String subscriptionName) {
- SubscriptionId id = new SubscriptionId();
- id.setClientId(clientId);
- id.setSubscriptionName(subscriptionName);
- id.setDestination(destinationName);
+ public JPATopicMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
+ super(adapter, destination);
+ }
+
+ public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+ EntityManager manager = adapter.beginEntityManager(context);
+ try {
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+ ss.setLastAckedId(messageId.getBrokerSequenceId());
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(context, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(context, manager);
+ }
+
+ public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ StoredSubscription ss = new StoredSubscription();
+ ss.setClientId(info.getClientId());
+ ss.setSubscriptionName(info.getSubscriptionName());
+ ss.setDestination(destinationName);
+ ss.setSelector(info.getSelector());
+ ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
+ ss.setLastAckedId(-1);
+
+ if (!retroactive) {
+ Query query = manager.createQuery("select max(m.id) from StoredMessage m");
+ Long rc = (Long)query.getSingleResult();
+ if (rc != null) {
+ ss.setLastAckedId(rc);
+ }
+ }
+
+ manager.persist(ss);
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ }
+
+ public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+ manager.remove(ss);
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ }
+
+ private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) {
+ Query query = manager.createQuery("select ss from StoredSubscription ss " + "where ss.clientId=?1 " + "and ss.subscriptionName=?2 " + "and ss.destination=?3");
+ query.setParameter(1, clientId);
+ query.setParameter(2, subscriptionName);
+ query.setParameter(3, destinationName);
+ List<StoredSubscription> resultList = query.getResultList();
+ if (resultList.isEmpty()) {
+ return null;
+ }
+ return resultList.get(0);
+ }
+
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+ SubscriptionInfo rc[];
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>();
+
+ Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
+ query.setParameter(1, destinationName);
+ for (StoredSubscription ss : (List<StoredSubscription>)query.getResultList()) {
+ SubscriptionInfo info = new SubscriptionInfo();
+ info.setClientId(ss.getClientId());
+ info.setDestination(destination);
+ info.setSelector(ss.getSelector());
+ info.setSubscriptionName(ss.getSubscriptionName());
+ info.setSubscribedDestination(toSubscribedDestination(ss));
+ l.add(info);
+ }
+
+ rc = new SubscriptionInfo[l.size()];
+ l.toArray(rc);
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ return rc;
+ }
+
+ public int getMessageCount(String clientId, String subscriptionName) throws IOException {
+ Long rc;
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ Query query = manager.createQuery("select count(m) FROM StoredMessage m, StoredSubscription ss " + "where ss.clientId=?1 " + "and ss.subscriptionName=?2 " + "and ss.destination=?3 "
+ + "and m.destination=ss.destination and m.id > ss.lastAckedId");
+ query.setParameter(1, clientId);
+ query.setParameter(2, subscriptionName);
+ query.setParameter(3, destinationName);
+ rc = (Long)query.getSingleResult();
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ return rc.intValue();
+ }
+
+ public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+ SubscriptionInfo rc = null;
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+ if (ss != null) {
+ rc = new SubscriptionInfo();
+ rc.setClientId(ss.getClientId());
+ rc.setDestination(destination);
+ rc.setSelector(ss.getSelector());
+ rc.setSubscriptionName(ss.getSubscriptionName());
+ rc.setSubscribedDestination(toSubscribedDestination(ss));
+ }
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ return rc;
+ }
+
+ private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
+ if (ss.getSubscribedDestination() == null) {
+ return null;
+ }
+ return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE);
+ }
+
+ public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ SubscriptionId id = new SubscriptionId();
+ id.setClientId(clientId);
+ id.setSubscriptionName(subscriptionName);
+ id.setDestination(destinationName);
+
+ AtomicLong last = subscriberLastMessageMap.get(id);
+ if (last == null) {
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+ last = new AtomicLong(ss.getLastAckedId());
+ subscriberLastMessageMap.put(id, last);
+ }
+ final AtomicLong lastMessageId = last;
+
+ Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
+ query.setParameter(1, destinationName);
+ query.setParameter(2, lastMessageId.get());
+ query.setMaxResults(maxReturned);
+ int count = 0;
+ for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
+ Message message = (Message)wireFormat.unmarshal(new ByteSequence(m.getData()));
+ listener.recoverMessage(message);
+ lastMessageId.set(m.getId());
+ count++;
+ if (count >= maxReturned) {
+ return;
+ }
+ }
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ }
+
+ public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+
+ Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
+ query.setParameter(1, destinationName);
+ query.setParameter(2, ss.getLastAckedId());
+ for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
+ Message message = (Message)wireFormat.unmarshal(new ByteSequence(m.getData()));
+ listener.recoverMessage(message);
+ }
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ }
+
+ public void resetBatching(String clientId, String subscriptionName) {
+ SubscriptionId id = new SubscriptionId();
+ id.setClientId(clientId);
+ id.setSubscriptionName(subscriptionName);
+ id.setDestination(destinationName);
subscriberLastMessageMap.remove(id);
- }
+ }
}
Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java Sat Aug 11 22:27:21 2007
@@ -38,221 +38,215 @@
import org.apache.activemq.util.IOExceptionSupport;
public class JPATopicReferenceStore extends JPAReferenceStore implements TopicReferenceStore {
- private Map<SubscriptionId,AtomicLong> subscriberLastMessageMap=new ConcurrentHashMap<SubscriptionId,AtomicLong>();
+ private Map<SubscriptionId, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<SubscriptionId, AtomicLong>();
- public JPATopicReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
- super(adapter, destination);
- }
-
- public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
- EntityManager manager = adapter.beginEntityManager(context);
- try {
- StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
- ss.setLastAckedId(messageId.getBrokerSequenceId());
- } catch (Throwable e) {
- adapter.rollbackEntityManager(context,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(context,manager);
- }
-
- public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
- EntityManager manager = adapter.beginEntityManager(null);
- try {
- StoredSubscription ss = new StoredSubscription();
- ss.setClientId(info.getClientId());
- ss.setSubscriptionName(info.getSubcriptionName());
- ss.setDestination(destinationName);
- ss.setSelector(info.getSelector());
- ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
- ss.setLastAckedId(-1);
-
- if( !retroactive ) {
- Query query = manager.createQuery("select max(m.id) from StoredMessageReference m");
- Long rc = (Long) query.getSingleResult();
- if( rc != null ) {
- ss.setLastAckedId(rc);
- }
- }
-
- manager.persist(ss);
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- }
-
- public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
- EntityManager manager = adapter.beginEntityManager(null);
- try {
- StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
- manager.remove(ss);
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- }
-
- private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) {
- Query query = manager.createQuery(
- "select ss from StoredSubscription ss " +
- "where ss.clientId=?1 " +
- "and ss.subscriptionName=?2 " +
- "and ss.destination=?3");
- query.setParameter(1, clientId);
- query.setParameter(2, subscriptionName);
- query.setParameter(3, destinationName);
- List<StoredSubscription> resultList = query.getResultList();
- if( resultList.isEmpty() )
- return null;
- return resultList.get(0);
- }
-
- public SubscriptionInfo[] getAllSubscriptions() throws IOException {
- SubscriptionInfo rc[];
- EntityManager manager = adapter.beginEntityManager(null);
- try {
- ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>();
-
- Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
- query.setParameter(1, destinationName);
- for (StoredSubscription ss : (List<StoredSubscription>)query.getResultList()) {
- SubscriptionInfo info = new SubscriptionInfo();
- info.setClientId(ss.getClientId());
- info.setDestination(destination);
- info.setSelector(ss.getSelector());
- info.setSubscriptionName(ss.getSubscriptionName());
- info.setSubscribedDestination(toSubscribedDestination(ss));
- l.add(info);
- }
-
- rc = new SubscriptionInfo[l.size()];
- l.toArray(rc);
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- return rc;
- }
-
- private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
- if( ss.getSubscribedDestination() == null )
- return null;
- return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE);
- }
-
- public int getMessageCount(String clientId, String subscriptionName) throws IOException {
- Long rc;
- EntityManager manager = adapter.beginEntityManager(null);
- try {
- Query query = manager.createQuery(
- "select count(m) FROM StoredMessageReference m, StoredSubscription ss " +
- "where ss.clientId=?1 " +
- "and ss.subscriptionName=?2 " +
- "and ss.destination=?3 " +
- "and m.destination=ss.destination and m.id > ss.lastAckedId");
- query.setParameter(1, clientId);
- query.setParameter(2, subscriptionName);
- query.setParameter(3, destinationName);
- rc = (Long) query.getSingleResult();
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- return rc.intValue();
- }
-
- public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
- SubscriptionInfo rc=null;
- EntityManager manager = adapter.beginEntityManager(null);
- try {
- StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
- if( ss != null ) {
- rc = new SubscriptionInfo();
- rc.setClientId(ss.getClientId());
- rc.setDestination(destination);
- rc.setSelector(ss.getSelector());
- rc.setSubscriptionName(ss.getSubscriptionName());
- rc.setSubscribedDestination(toSubscribedDestination(ss));
- }
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- return rc;
- }
-
- public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
- EntityManager manager = adapter.beginEntityManager(null);
- try {
- SubscriptionId id = new SubscriptionId();
- id.setClientId(clientId);
- id.setSubscriptionName(subscriptionName);
- id.setDestination(destinationName);
-
- AtomicLong last=subscriberLastMessageMap.get(id);
- if(last==null){
- StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
- last=new AtomicLong(ss.getLastAckedId());
- subscriberLastMessageMap.put(id,last);
- }
- final AtomicLong lastMessageId=last;
-
- Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
- query.setParameter(1, destinationName);
- query.setParameter(2, lastMessageId.get());
- query.setMaxResults(maxReturned);
- int count = 0;
- for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
- MessageId mid = new MessageId(m.getMessageId());
- mid.setBrokerSequenceId(m.getId());
- listener.recoverMessageReference(mid);
-
- lastMessageId.set(m.getId());
- count++;
- if( count >= maxReturned ) {
- return;
- }
- }
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- }
-
- public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
- EntityManager manager = adapter.beginEntityManager(null);
- try {
-
- StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
-
- Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
- query.setParameter(1, destinationName);
- query.setParameter(2, ss.getLastAckedId());
- for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
- MessageId mid = new MessageId(m.getMessageId());
- mid.setBrokerSequenceId(m.getId());
- listener.recoverMessageReference(mid);
- }
- } catch (Throwable e) {
- adapter.rollbackEntityManager(null,manager);
- throw IOExceptionSupport.create(e);
- }
- adapter.commitEntityManager(null,manager);
- }
-
- public void resetBatching(String clientId, String subscriptionName) {
- SubscriptionId id = new SubscriptionId();
- id.setClientId(clientId);
- id.setSubscriptionName(subscriptionName);
- id.setDestination(destinationName);
+ public JPATopicReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
+ super(adapter, destination);
+ }
+
+ public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+ EntityManager manager = adapter.beginEntityManager(context);
+ try {
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+ ss.setLastAckedId(messageId.getBrokerSequenceId());
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(context, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(context, manager);
+ }
+
+ public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ StoredSubscription ss = new StoredSubscription();
+ ss.setClientId(info.getClientId());
+ ss.setSubscriptionName(info.getSubcriptionName());
+ ss.setDestination(destinationName);
+ ss.setSelector(info.getSelector());
+ ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
+ ss.setLastAckedId(-1);
+
+ if (!retroactive) {
+ Query query = manager.createQuery("select max(m.id) from StoredMessageReference m");
+ Long rc = (Long)query.getSingleResult();
+ if (rc != null) {
+ ss.setLastAckedId(rc);
+ }
+ }
+
+ manager.persist(ss);
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ }
+
+ public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+ manager.remove(ss);
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ }
+
+ private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) {
+ Query query = manager.createQuery("select ss from StoredSubscription ss " + "where ss.clientId=?1 " + "and ss.subscriptionName=?2 " + "and ss.destination=?3");
+ query.setParameter(1, clientId);
+ query.setParameter(2, subscriptionName);
+ query.setParameter(3, destinationName);
+ List<StoredSubscription> resultList = query.getResultList();
+ if (resultList.isEmpty()) {
+ return null;
+ }
+ return resultList.get(0);
+ }
+
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+ SubscriptionInfo rc[];
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>();
+
+ Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
+ query.setParameter(1, destinationName);
+ for (StoredSubscription ss : (List<StoredSubscription>)query.getResultList()) {
+ SubscriptionInfo info = new SubscriptionInfo();
+ info.setClientId(ss.getClientId());
+ info.setDestination(destination);
+ info.setSelector(ss.getSelector());
+ info.setSubscriptionName(ss.getSubscriptionName());
+ info.setSubscribedDestination(toSubscribedDestination(ss));
+ l.add(info);
+ }
+
+ rc = new SubscriptionInfo[l.size()];
+ l.toArray(rc);
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ return rc;
+ }
+
+ private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
+ if (ss.getSubscribedDestination() == null) {
+ return null;
+ }
+ return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE);
+ }
+
+ public int getMessageCount(String clientId, String subscriptionName) throws IOException {
+ Long rc;
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ Query query = manager.createQuery("select count(m) FROM StoredMessageReference m, StoredSubscription ss " + "where ss.clientId=?1 " + "and ss.subscriptionName=?2 "
+ + "and ss.destination=?3 " + "and m.destination=ss.destination and m.id > ss.lastAckedId");
+ query.setParameter(1, clientId);
+ query.setParameter(2, subscriptionName);
+ query.setParameter(3, destinationName);
+ rc = (Long)query.getSingleResult();
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ return rc.intValue();
+ }
+
+ public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+ SubscriptionInfo rc = null;
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+ if (ss != null) {
+ rc = new SubscriptionInfo();
+ rc.setClientId(ss.getClientId());
+ rc.setDestination(destination);
+ rc.setSelector(ss.getSelector());
+ rc.setSubscriptionName(ss.getSubscriptionName());
+ rc.setSubscribedDestination(toSubscribedDestination(ss));
+ }
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ return rc;
+ }
+
+ public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+ SubscriptionId id = new SubscriptionId();
+ id.setClientId(clientId);
+ id.setSubscriptionName(subscriptionName);
+ id.setDestination(destinationName);
+
+ AtomicLong last = subscriberLastMessageMap.get(id);
+ if (last == null) {
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+ last = new AtomicLong(ss.getLastAckedId());
+ subscriberLastMessageMap.put(id, last);
+ }
+ final AtomicLong lastMessageId = last;
+
+ Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
+ query.setParameter(1, destinationName);
+ query.setParameter(2, lastMessageId.get());
+ query.setMaxResults(maxReturned);
+ int count = 0;
+ for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
+ MessageId mid = new MessageId(m.getMessageId());
+ mid.setBrokerSequenceId(m.getId());
+ listener.recoverMessageReference(mid);
+
+ lastMessageId.set(m.getId());
+ count++;
+ if (count >= maxReturned) {
+ return;
+ }
+ }
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ }
+
+ public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
+ EntityManager manager = adapter.beginEntityManager(null);
+ try {
+
+ StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
+
+ Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
+ query.setParameter(1, destinationName);
+ query.setParameter(2, ss.getLastAckedId());
+ for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
+ MessageId mid = new MessageId(m.getMessageId());
+ mid.setBrokerSequenceId(m.getId());
+ listener.recoverMessageReference(mid);
+ }
+ } catch (Throwable e) {
+ adapter.rollbackEntityManager(null, manager);
+ throw IOExceptionSupport.create(e);
+ }
+ adapter.commitEntityManager(null, manager);
+ }
+
+ public void resetBatching(String clientId, String subscriptionName) {
+ SubscriptionId id = new SubscriptionId();
+ id.setClientId(clientId);
+ id.setSubscriptionName(subscriptionName);
+ id.setDestination(destinationName);
subscriberLastMessageMap.remove(id);
- }
+ }
}
Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java Sat Aug 11 22:27:21 2007
@@ -1,11 +1,12 @@
-/*
- * Copyright 2006 The Apache Software Foundation.
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -26,16 +27,16 @@
*/
@Entity()
public class StoredMessage {
-
+
@Id
private long id;
-
- @Basic(optional=false)
- @Index(enabled=true, unique=false)
+
+ @Basic(optional = false)
+ @Index(enabled = true, unique = false)
private String messageId;
- @Basic(optional=false)
- @Index(enabled=true, unique=false)
+ @Basic(optional = false)
+ @Index(enabled = true, unique = false)
private String destination;
@Basic
@@ -48,44 +49,44 @@
public StoredMessage() {
}
- public byte[] getData() {
- return data;
- }
-
- public void setData(byte[] data) {
- this.data = data;
- }
-
- public String getDestination() {
- return destination;
- }
-
- public void setDestination(String destination) {
- this.destination = destination;
- }
-
- public long getExiration() {
- return exiration;
- }
-
- public void setExiration(long exiration) {
- this.exiration = exiration;
- }
-
- public String getMessageId() {
- return messageId;
- }
-
- public void setMessageId(String messageId) {
- this.messageId = messageId;
- }
-
- public long getId() {
- return id;
- }
-
- public void setId(long sequenceId) {
- this.id = sequenceId;
- }
+ public byte[] getData() {
+ return data;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ public String getDestination() {
+ return destination;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public long getExiration() {
+ return exiration;
+ }
+
+ public void setExiration(long exiration) {
+ this.exiration = exiration;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(String messageId) {
+ this.messageId = messageId;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long sequenceId) {
+ this.id = sequenceId;
+ }
}
Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java Sat Aug 11 22:27:21 2007
@@ -1,11 +1,12 @@
-/*
- * Copyright 2006 The Apache Software Foundation.
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,73 +26,79 @@
*/
@Entity()
public class StoredMessageReference {
-
+
@Id
private long id;
-
- @Basic(optional=false)
- @Index(enabled=true, unique=false)
+
+ @Basic(optional = false)
+ @Index(enabled = true, unique = false)
private String messageId;
- @Basic(optional=false)
- @Index(enabled=true, unique=false)
+ @Basic(optional = false)
+ @Index(enabled = true, unique = false)
private String destination;
@Basic
- @Index(enabled=false, unique=false)
+ @Index(enabled = false, unique = false)
private long exiration;
- @Basic(optional=false)
- @Index(enabled=false, unique=false)
- private int fileId;
-
- @Basic(optional=false)
- @Index(enabled=false, unique=false)
- private int offset;
+ @Basic(optional = false)
+ @Index(enabled = false, unique = false)
+ private int fileId;
+
+ @Basic(optional = false)
+ @Index(enabled = false, unique = false)
+ private int offset;
public StoredMessageReference() {
}
- public String getDestination() {
- return destination;
- }
- public void setDestination(String destination) {
- this.destination = destination;
- }
-
- public long getExiration() {
- return exiration;
- }
- public void setExiration(long exiration) {
- this.exiration = exiration;
- }
-
- public String getMessageId() {
- return messageId;
- }
- public void setMessageId(String messageId) {
- this.messageId = messageId;
- }
-
- public long getId() {
- return id;
- }
- public void setId(long sequenceId) {
- this.id = sequenceId;
- }
-
- public int getFileId() {
- return fileId;
- }
- public void setFileId(int fileId) {
- this.fileId = fileId;
- }
-
- public int getOffset() {
- return offset;
- }
- public void setOffset(int offset) {
- this.offset = offset;
- }
+ public String getDestination() {
+ return destination;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public long getExiration() {
+ return exiration;
+ }
+
+ public void setExiration(long exiration) {
+ this.exiration = exiration;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(String messageId) {
+ this.messageId = messageId;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long sequenceId) {
+ this.id = sequenceId;
+ }
+
+ public int getFileId() {
+ return fileId;
+ }
+
+ public void setFileId(int fileId) {
+ this.fileId = fileId;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public void setOffset(int offset) {
+ this.offset = offset;
+ }
}
Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java (original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java Sat Aug 11 22:27:21 2007
@@ -1,11 +1,12 @@
-/*
- * Copyright 2006 The Apache Software Foundation.
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,81 +28,80 @@
*/
@Entity
public class StoredSubscription {
-
- /**
+
+ /**
* Application identity class for Magazine.
*/
public static class SubscriptionId {
- public String destination;
- public String clientId;
- public String subscriptionName;
+ public String destination;
+ public String clientId;
+ public String subscriptionName;
public boolean equals(Object other) {
- if (other == this)
+ if (other == this) {
return true;
- if (!(other instanceof SubscriptionId))
+ }
+ if (!(other instanceof SubscriptionId)) {
return false;
-
- SubscriptionId sid = (SubscriptionId) other;
+ }
+
+ SubscriptionId sid = (SubscriptionId)other;
return (destination == sid.destination || (destination != null && destination.equals(sid.destination)))
- && (clientId == sid.clientId || (clientId != null && clientId.equals(sid.clientId)))
- && (subscriptionName == sid.subscriptionName || (subscriptionName != null && subscriptionName.equals(sid.subscriptionName)));
+ && (clientId == sid.clientId || (clientId != null && clientId.equals(sid.clientId)))
+ && (subscriptionName == sid.subscriptionName || (subscriptionName != null && subscriptionName.equals(sid.subscriptionName)));
}
-
+
/**
* Hashcode must also depend on identity values.
*/
public int hashCode() {
- return ((destination == null) ? 0 : destination.hashCode())
- ^ ((clientId == null) ? 0 : clientId.hashCode())
- ^ ((subscriptionName == null) ? 0 : subscriptionName.hashCode())
- ;
- }
+ return ((destination == null) ? 0 : destination.hashCode()) ^ ((clientId == null) ? 0 : clientId.hashCode()) ^ ((subscriptionName == null) ? 0 : subscriptionName.hashCode());
+ }
public String toString() {
return destination + ":" + clientId + ":" + subscriptionName;
}
- public String getClientId() {
- return clientId;
- }
-
- public void setClientId(String clientId) {
- this.clientId = clientId;
- }
-
- public String getDestination() {
- return destination;
- }
-
- public void setDestination(String destination) {
- this.destination = destination;
- }
-
- public String getSubscriptionName() {
- return subscriptionName;
- }
-
- public void setSubscriptionName(String subscriptionName) {
- this.subscriptionName = subscriptionName;
- }
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getDestination() {
+ return destination;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
+
+ public void setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ }
}
@Id
- @GeneratedValue(strategy=GenerationType.AUTO)
+ @GeneratedValue(strategy = GenerationType.AUTO)
private long id;
-
+
@Basic
- @Index(enabled=true, unique=false)
+ @Index(enabled = true, unique = false)
private String destination;
@Basic
- @Index(enabled=true, unique=false)
+ @Index(enabled = true, unique = false)
private String clientId;
@Basic
- @Index(enabled=true, unique=false)
+ @Index(enabled = true, unique = false)
private String subscriptionName;
-
+
@Basic
private long lastAckedId;
@Basic
@@ -109,59 +109,59 @@
@Basic
private String subscribedDestination;
- public long getLastAckedId() {
- return lastAckedId;
- }
-
- public void setLastAckedId(long lastAckedId) {
- this.lastAckedId = lastAckedId;
- }
-
- public String getSelector() {
- return selector;
- }
-
- public void setSelector(String selector) {
- this.selector = selector;
- }
-
- public String getDestination() {
- return destination;
- }
-
- public void setDestination(String destination) {
- this.destination = destination;
- }
-
- public String getClientId() {
- return clientId;
- }
-
- public void setClientId(String clientId) {
- this.clientId = clientId;
- }
-
- public String getSubscriptionName() {
- return subscriptionName;
- }
-
- public void setSubscriptionName(String subscriptionName) {
- this.subscriptionName = subscriptionName;
- }
-
- public long getId() {
- return id;
- }
-
- public void setId(long id) {
- this.id = id;
- }
-
- public String getSubscribedDestination() {
- return subscribedDestination;
- }
-
- public void setSubscribedDestination(String subscribedDestination) {
- this.subscribedDestination = subscribedDestination;
- }
+ public long getLastAckedId() {
+ return lastAckedId;
+ }
+
+ public void setLastAckedId(long lastAckedId) {
+ this.lastAckedId = lastAckedId;
+ }
+
+ public String getSelector() {
+ return selector;
+ }
+
+ public void setSelector(String selector) {
+ this.selector = selector;
+ }
+
+ public String getDestination() {
+ return destination;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
+
+ public void setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getSubscribedDestination() {
+ return subscribedDestination;
+ }
+
+ public void setSubscribedDestination(String subscribedDestination) {
+ this.subscribedDestination = subscribedDestination;
+ }
}
Modified: activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java (original)
+++ activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java Sat Aug 11 22:27:21 2007
@@ -23,25 +23,24 @@
import org.springframework.core.io.ClassPathResource;
/**
- *
* @version $Revision$
*/
-public class JPAStoreLoadTester extends LoadTester {
+public class JPAStoreLoadTester extends LoadTester {
protected BrokerService createBroker() throws Exception {
- BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/jpabroker.xml"));
+ BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/jpabroker.xml"));
brokerFactory.afterPropertiesSet();
- BrokerService broker = brokerFactory.getBroker();
+ BrokerService broker = brokerFactory.getBroker();
broker.setDeleteAllMessagesOnStartup(true);
return broker;
}
-
+
public static Test suite() {
return suite(JPAStoreLoadTester.class);
}
-
+
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
-
+
}
Modified: activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java (original)
+++ activemq/trunk/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java Sat Aug 11 22:27:21 2007
@@ -23,25 +23,24 @@
import org.springframework.core.io.ClassPathResource;
/**
- *
* @version $Revision$
*/
-public class QuickJPAStoreLoadTester extends LoadTester {
+public class QuickJPAStoreLoadTester extends LoadTester {
protected BrokerService createBroker() throws Exception {
- BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/quickjpabroker.xml"));
+ BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/quickjpabroker.xml"));
brokerFactory.afterPropertiesSet();
- BrokerService broker = brokerFactory.getBroker();
+ BrokerService broker = brokerFactory.getBroker();
broker.setDeleteAllMessagesOnStartup(true);
return broker;
}
-
+
public static Test suite() {
return suite(QuickJPAStoreLoadTester.class);
}
-
+
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
-
+
}
Modified: activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CGeneratorTask.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CGeneratorTask.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CGeneratorTask.java (original)
+++ activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CGeneratorTask.java Sat Aug 11 22:27:21 2007
@@ -26,94 +26,93 @@
import org.codehaus.jam.JamServiceParams;
/**
- *
* @version $Revision: 384826 $
*/
public class CGeneratorTask extends Task {
-
- int version = 2;
- File source = new File(".");
- File target = new File(".");
-
+
+ int version = 2;
+ File source = new File(".");
+ File target = new File(".");
+
public static void main(String[] args) {
-
+
Project project = new Project();
project.init();
- CGeneratorTask generator = new CGeneratorTask();
- generator.setProject(project);
-
- if( args.length > 0 ) {
- generator.version = Integer.parseInt(args[0]);
- }
-
- if( args.length > 1 ) {
- generator.source = new File(args[1]);
- }
-
- if( args.length > 2 ) {
- generator.target = new File(args[2]);
- }
-
- generator.execute();
- }
-
+ CGeneratorTask generator = new CGeneratorTask();
+ generator.setProject(project);
+
+ if (args.length > 0) {
+ generator.version = Integer.parseInt(args[0]);
+ }
+
+ if (args.length > 1) {
+ generator.source = new File(args[1]);
+ }
+
+ if (args.length > 2) {
+ generator.target = new File(args[2]);
+ }
+
+ generator.execute();
+ }
+
public void execute() throws BuildException {
try {
-
- String sourceDir = source+"/src/main/java";
-
+
+ String sourceDir = source + "/src/main/java";
+
System.out.println("Parsing source files in: " + sourceDir);
JamServiceFactory jamServiceFactory = JamServiceFactory.getInstance();
- JamServiceParams params = jamServiceFactory.createServiceParams();
- File[] dirs = new File[]{new File(sourceDir)};
+ JamServiceParams params = jamServiceFactory.createServiceParams();
+ File[] dirs = new File[] {
+ new File(sourceDir)
+ };
params.includeSourcePattern(dirs, "**/*.java");
JamService jam = jamServiceFactory.createService(params);
{
- CHeadersGenerator script = new CHeadersGenerator();
- script.setJam(jam);
- script.setTargetDir(target+"/src/libopenwire");
- script.setOpenwireVersion(version);
- script.run();
+ CHeadersGenerator script = new CHeadersGenerator();
+ script.setJam(jam);
+ script.setTargetDir(target + "/src/libopenwire");
+ script.setOpenwireVersion(version);
+ script.run();
}
{
- CSourcesGenerator script = new CSourcesGenerator();
- script.setJam(jam);
- script.setTargetDir(target+"/src/libopenwire");
- script.setOpenwireVersion(version);
- script.run();
+ CSourcesGenerator script = new CSourcesGenerator();
+ script.setJam(jam);
+ script.setTargetDir(target + "/src/libopenwire");
+ script.setOpenwireVersion(version);
+ script.run();
}
-
-
-
+
} catch (Exception e) {
- throw new BuildException(e);
+ throw new BuildException(e);
}
}
- public int getVersion() {
- return version;
- }
-
- public void setVersion(int version) {
- this.version = version;
- }
-
- public File getSource() {
- return source;
- }
-
- public void setSource(File basedir) {
- this.source = basedir;
- }
-
- public File getTarget() {
- return target;
- }
-
- public void setTarget(File target) {
- this.target = target;
- }
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public File getSource() {
+ return source;
+ }
+
+ public void setSource(File basedir) {
+ this.source = basedir;
+ }
+
+ public File getTarget() {
+ return target;
+ }
+
+ public void setTarget(File target) {
+ this.target = target;
+ }
}
Modified: activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CHeadersGenerator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CHeadersGenerator.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CHeadersGenerator.java (original)
+++ activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CHeadersGenerator.java Sat Aug 11 22:27:21 2007
@@ -32,7 +32,6 @@
import org.codehaus.jam.JProperty;
/**
- *
* @version $Revision: 383749 $
*/
public class CHeadersGenerator extends SingleSourceGenerator {
@@ -56,23 +55,22 @@
}
protected void generateLicence(PrintWriter out) {
-out.println("/**");
-out.println(" *");
-out.println(" * Licensed to the Apache Software Foundation (ASF) under one or more");
-out.println(" * contributor license agreements. See the NOTICE file distributed with");
-out.println(" * this work for additional information regarding copyright ownership.");
-out.println(" * The ASF licenses this file to You under the Apache License, Version 2.0");
-out.println(" * (the \"License\"); you may not use this file except in compliance with");
-out.println(" * the License. You may obtain a copy of the License at");
-out.println(" *");
-out.println(" * http://www.apache.org/licenses/LICENSE-2.0");
-out.println(" *");
-out.println(" * Unless required by applicable law or agreed to in writing, software");
-out.println(" * distributed under the License is distributed on an \"AS IS\" BASIS,");
-out.println(" * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.");
-out.println(" * See the License for the specific language governing permissions and");
-out.println(" * limitations under the License.");
-out.println(" */");
+ out.println("/**");
+ out.println(" * Licensed to the Apache Software Foundation (ASF) under one or more");
+ out.println(" * contributor license agreements. See the NOTICE file distributed with");
+ out.println(" * this work for additional information regarding copyright ownership.");
+ out.println(" * The ASF licenses this file to You under the Apache License, Version 2.0");
+ out.println(" * (the \"License\"); you may not use this file except in compliance with");
+ out.println(" * the License. You may obtain a copy of the License at");
+ out.println(" *");
+ out.println(" * http://www.apache.org/licenses/LICENSE-2.0");
+ out.println(" *");
+ out.println(" * Unless required by applicable law or agreed to in writing, software");
+ out.println(" * distributed under the License is distributed on an \"AS IS\" BASIS,");
+ out.println(" * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.");
+ out.println(" * See the License for the specific language governing permissions and");
+ out.println(" * limitations under the License.");
+ out.println(" */");
}
String changeCase(String value) {
@@ -80,9 +78,9 @@
char[] cs = value.toCharArray();
for (int i = 0; i < cs.length; i++) {
char c = cs[i];
- if (Character.isUpperCase((char) c)) {
+ if (Character.isUpperCase((char)c)) {
b.append('_');
- b.append(Character.toLowerCase((char) c));
+ b.append(Character.toLowerCase((char)c));
} else {
b.append(c);
}
@@ -99,33 +97,36 @@
*/
protected List<JClass> sort(List source) {
LinkedHashMap<JClass, JClass> rc = new LinkedHashMap<JClass, JClass>();
- ArrayList classes = new ArrayList(source);
- Collections.sort(classes, new Comparator(){
- public int compare(Object o1, Object o2) {
- JClass c1 = (JClass) o1;
- JClass c2 = (JClass) o2;
- return c1.getSimpleName().compareTo(c2.getSimpleName());
- }});
-
+ ArrayList classes = new ArrayList(source);
+ Collections.sort(classes, new Comparator() {
+ public int compare(Object o1, Object o2) {
+ JClass c1 = (JClass)o1;
+ JClass c2 = (JClass)o2;
+ return c1.getSimpleName().compareTo(c2.getSimpleName());
+ }
+ });
+
// lets make a map of all the class names
HashMap<JClass, JClass> classNames = new HashMap<JClass, JClass>();
for (Iterator iter = classes.iterator(); iter.hasNext();) {
- JClass c = (JClass) iter.next();
+ JClass c = (JClass)iter.next();
classNames.put(c, c);
}
// Add all classes that have no parent first
for (Iterator iter = classes.iterator(); iter.hasNext();) {
- JClass c = (JClass) iter.next();
- if (!classNames.containsKey(c.getSuperclass()))
+ JClass c = (JClass)iter.next();
+ if (!classNames.containsKey(c.getSuperclass())) {
rc.put(c, c);
+ }
}
// now lets add the rest
for (Iterator iter = classes.iterator(); iter.hasNext();) {
- JClass c = (JClass) iter.next();
- if (!rc.containsKey(c))
- rc.put(c,c);
+ JClass c = (JClass)iter.next();
+ if (!rc.containsKey(c)) {
+ rc.put(c, c);
+ }
}
return new ArrayList<JClass>(rc.keySet());
@@ -151,109 +152,109 @@
for (Iterator<JProperty> iter = properties.iterator(); iter.hasNext();) {
JProperty property = iter.next();
JAnnotation annotation = property.getGetter().getAnnotation("openwire:property");
- JAnnotationValue size = annotation.getValue("size");
+// JAnnotationValue size = annotation.getValue("size");
String name = toPropertyCase(property.getSimpleName());
- boolean cached = isCachedProperty(property);
+// boolean cached = isCachedProperty(property);
String type = property.getType().getQualifiedName();
if (type.equals("boolean")) {
- out.println(" ow_"+type+" "+name+";");
+ out.println(" ow_" + type + " " + name + ";");
} else if (type.equals("byte")) {
- out.println(" ow_"+type+" "+name+";");
+ out.println(" ow_" + type + " " + name + ";");
} else if (type.equals("char")) {
- out.println(" ow_"+type+" "+name+";");
+ out.println(" ow_" + type + " " + name + ";");
} else if (type.equals("short")) {
- out.println(" ow_"+type+" "+name+";");
+ out.println(" ow_" + type + " " + name + ";");
} else if (type.equals("int")) {
- out.println(" ow_"+type+" "+name+";");
+ out.println(" ow_" + type + " " + name + ";");
} else if (type.equals("long")) {
- out.println(" ow_"+type+" "+name+";");
+ out.println(" ow_" + type + " " + name + ";");
} else if (type.equals("byte[]")) {
- out.println(" ow_byte_array *"+name+";");
+ out.println(" ow_byte_array *" + name + ";");
} else if (type.equals("org.apache.activeio.packet.ByteSequence")) {
- out.println(" ow_byte_array *"+name+";");
+ out.println(" ow_byte_array *" + name + ";");
} else if (type.equals("org.apache.activeio.packet.ByteSequence")) {
- out.println(" ow_byte_array *"+name+";");
+ out.println(" ow_byte_array *" + name + ";");
} else if (type.equals("java.lang.String")) {
- out.println(" ow_string *"+name+";");
+ out.println(" ow_string *" + name + ";");
} else {
if (property.getType().isArrayType()) {
- out.println(" ow_DataStructure_array *"+name+";");
+ out.println(" ow_DataStructure_array *" + name + ";");
} else if (isThrowable(property.getType())) {
- out.println(" ow_throwable *"+name+";");
+ out.println(" ow_throwable *" + name + ";");
} else {
- out.println(" struct ow_" + property.getType().getSimpleName() + " *"+name+";");
+ out.println(" struct ow_" + property.getType().getSimpleName() + " *" + name + ";");
}
}
}
}
-
-
+
protected void generateSetup(PrintWriter out) {
generateLicence(out);
-out.println("");
-out.println("/*****************************************************************************************");
-out.println(" * ");
-out.println(" * NOTE!: This file is auto generated - do not modify!");
-out.println(" * if you need to make a change, please see the modify the groovy scripts in the");
-out.println(" * under src/gram/script and then use maven openwire:generate to regenerate ");
-out.println(" * this file.");
-out.println(" * ");
-out.println(" *****************************************************************************************/");
-out.println(" ");
-out.println("#ifndef OW_COMMANDS_V"+openwireVersion+"_H");
-out.println("#define OW_COMMANDS_V"+openwireVersion+"_H");
-out.println("");
-out.println("#include \"ow.h\"");
-out.println("");
-out.println("#ifdef __cplusplus");
-out.println("extern \"C\" {");
-out.println("#endif /* __cplusplus */");
-out.println(" ");
-out.println("#define OW_WIREFORMAT_VERSION "+openwireVersion+"");
-
-out.println("#define OW_WIREFORMAT_STACK_TRACE_MASK 0x00000001;");
-out.println("#define OW_WIREFORMAT_TCP_NO_DELAY_MASK 0x00000002;");
-out.println("#define OW_WIREFORMAT_CACHE_MASK 0x00000004;");
-out.println("#define OW_WIREFORMAT_COMPRESSION_MASK 0x00000008;");
-
- for (Iterator iterator = sortedClasses.iterator(); iterator.hasNext();) {
- JClass jclass = (JClass) iterator.next();
- String name = jclass.getSimpleName();
- String type = ("ow_"+name).toUpperCase()+"_TYPE";
- if( !isAbstract(jclass) ) {
- out.println("#define "+type+" "+getOpenWireOpCode(jclass));
- }
- }
-
-out.println(" ");
-out.println("apr_status_t ow_bitmarshall(ow_bit_buffer *buffer, ow_DataStructure *object);");
-out.println("apr_status_t ow_marshall(ow_byte_buffer *buffer, ow_DataStructure *object);");
+ out.println("");
+ out.println("/*****************************************************************************************");
+ out.println(" * ");
+ out.println(" * NOTE!: This file is auto generated - do not modify!");
+ out.println(" * if you need to make a change, please see the modify the groovy scripts in the");
+ out.println(" * under src/gram/script and then use maven openwire:generate to regenerate ");
+ out.println(" * this file.");
+ out.println(" * ");
+ out.println(" *****************************************************************************************/");
+ out.println(" ");
+ out.println("#ifndef OW_COMMANDS_V" + openwireVersion + "_H");
+ out.println("#define OW_COMMANDS_V" + openwireVersion + "_H");
+ out.println("");
+ out.println("#include \"ow.h\"");
+ out.println("");
+ out.println("#ifdef __cplusplus");
+ out.println("extern \"C\" {");
+ out.println("#endif /* __cplusplus */");
+ out.println(" ");
+ out.println("#define OW_WIREFORMAT_VERSION " + openwireVersion + "");
+
+ out.println("#define OW_WIREFORMAT_STACK_TRACE_MASK 0x00000001;");
+ out.println("#define OW_WIREFORMAT_TCP_NO_DELAY_MASK 0x00000002;");
+ out.println("#define OW_WIREFORMAT_CACHE_MASK 0x00000004;");
+ out.println("#define OW_WIREFORMAT_COMPRESSION_MASK 0x00000008;");
+
+ for (Iterator iterator = sortedClasses.iterator(); iterator.hasNext();) {
+ JClass jclass = (JClass)iterator.next();
+ String name = jclass.getSimpleName();
+ String type = ("ow_" + name).toUpperCase() + "_TYPE";
+ if (!isAbstract(jclass)) {
+ out.println("#define " + type + " " + getOpenWireOpCode(jclass));
+ }
+ }
+
+ out.println(" ");
+ out.println("apr_status_t ow_bitmarshall(ow_bit_buffer *buffer, ow_DataStructure *object);");
+ out.println("apr_status_t ow_marshall(ow_byte_buffer *buffer, ow_DataStructure *object);");
}
-
+
protected void generateFile(PrintWriter out) throws Exception {
String structName = jclass.getSimpleName();
-out.println("");
-out.println("typedef struct ow_"+structName+" {");
+ out.println("");
+ out.println("typedef struct ow_" + structName + " {");
- // This recusivly generates the field definitions of the class and it's supper classes.
+ // This recusivly generates the field definitions of the class and it's
+ // supper classes.
generateFields(out, jclass);
-out.println("");
-out.println("} ow_"+structName+";");
-out.println("ow_"+structName+" *ow_"+structName+"_create(apr_pool_t *pool);");
-out.println("ow_boolean ow_is_a_"+structName+"(ow_DataStructure *object);");
+ out.println("");
+ out.println("} ow_" + structName + ";");
+ out.println("ow_" + structName + " *ow_" + structName + "_create(apr_pool_t *pool);");
+ out.println("ow_boolean ow_is_a_" + structName + "(ow_DataStructure *object);");
}
-
+
protected void generateTearDown(PrintWriter out) {
-out.println("");
-out.println("#ifdef __cplusplus");
-out.println("}");
-out.println("#endif");
-out.println("");
-out.println("#endif /* ! OW_COMMANDS_V"+openwireVersion+"_H */");
+ out.println("");
+ out.println("#ifdef __cplusplus");
+ out.println("}");
+ out.println("#endif");
+ out.println("");
+ out.println("#endif /* ! OW_COMMANDS_V" + openwireVersion + "_H */");
}
}