You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by Heike Potter <He...@twinsoft.de> on 2017/08/10 22:00:36 UTC

Oracle - delivered Messages remain in ACTIVEMQ_MSGS

We are running an application on centos with tomcat 8.0.28, java 1.8 and
activemq 5.13.4
For persistency we are using Oracle 11.2

We have 3 queues with several producers and consumers.

From time to time we have the effect that messages remain in the
ACTIVEMQ_MSGS table in the oracle database though these messages are
consumed correctly (we get correct results).
These messages never get removed from the ACTIVEQ_MSG table. Our problem
occurs, when we restart activemq, 
because these messages are redelivered.

We checked the activemq-Logfile, the enqueue/dequeue counters for the queues
are identical.
What is astonishing us is the following logged DELETE command:
---
2017-06-09 10:24:58,281 [ Adaptor Task-2] DEBUG JournalPersistenceAdapter     
- Checkpoint started.
2017-06-09 10:24:58,291 [ Adaptor Task-2] DEBUG JDBCPersistenceAdapter        
- Cleaning up old messages.
2017-06-09 10:24:58,291 [ Adaptor Task-2] DEBUG DefaultJDBCAdapter            
- Executing SQL: DELETE FROM ACTIVEMQ_MSGS WHERE (PRIORITY=? AND ID <=     
( SELECT min(ACTIVEMQ_ACKS.LAST_ACKED_ID)       FROM ACTIVEMQ_ACKS WHERE
ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ_MSGS.CONTAINER        AND
ACTIVEMQ_ACKS.PRIORITY=?)   )
2017-06-09 10:24:58,300 [ Adaptor Task-2] DEBUG DefaultJDBCAdapter            
- Deleted 0 old message(s) at priority: 0
2017-06-09 10:24:58,300 [ Adaptor Task-2] DEBUG JDBCPersistenceAdapter        
- Cleanup done.
2017-06-09 10:24:58,300 [ Adaptor Task-2] DEBUG JournalPersistenceAdapter     
- Checkpoint done.
---
Though we have entries in ACTIVEMQ_MSGS we do not have entries in the
ACTIVEMQ_ACKS table, so if I understand it correctly the statement above can
never remove any records.

Here is our configuration file:
---
  <broker brokerName="KWG_BROKER"  useJmx="true"
xmlns="http://activemq.apache.org/schema/core">

    <persistenceFactory>
        <journalPersistenceAdapterFactory journalLogFiles="5"
dataDirectory="${activemq.data}/activemq-data" dataSource="#oracle-ds"/>
    </persistenceFactory> 


    <transportConnectors>
        <transportConnector
uri=&quot;tcp://&lt;ourhost>:8016?transport.trace=false&amp;transport.soTimeout=60000&amp;keepAlive=true"/>
    </transportConnectors>

  </broker>

    <bean id="oracle-ds" class="org.apache.commons.dbcp2.BasicDataSource"
destroy-method="close">
     <property name="driverClassName" value="oracle.jdbc.OracleDriver"/>
     <property name=&quot;url&quot;
value=&quot;jdbc:oracle:thin:@&lt;ourhost2>:1521/<ourdbname>"/>
     <property name="username" value="benutzer"/>
     <property name="password" value="passwort"/>
     <property name="poolPreparedStatements" value="true"/>
   </bean>
   
---

And a code snippet:
----
    public void onMessage(final Message message, final Session session) {
        try {
            _log.debug("OmqListener.On Message Beginn");
            ObjectMessage om = (ObjectMessage) message;
            Object obj = om.getObject();
            if (obj instanceof OutMessage) {
                OutMessage ome = (OutMessage) obj;
                String messtype = ome.getTyp().toString();
                String oid = message.getStringProperty(Constants.OID);
                String erstellungsdatum =
message.getStringProperty(Constants.EINSTELLUNGSDATUM);
                _log.debug(new StringBuffer("Message erhalten
Typus=").append(messtype)
                        .append("; OID=").append(oid)
                        .append(";
Erstellungsdatum=").append(erstellungsdatum)
                       
.append(";Logging=").append(ome.isLogging()).toString());
                _ausgangsService.performOutmessage(ome,
_propholder.isAntwortLoeschenAusDb());
                _log.info("onMessage: Anzahl Versuche = " +
ome.getAnzahlVersuche());
                
            }    
            _log.debug("OmqListener.On Message Ende");
        } catch (Throwable e) {
            //auf erneute Zustellversuche(Timer!) warten 
            _log.error("Fehler!", e);
        }
    }
---

When we restart activemq, what possibilities do I have to filter out in my
programme those messages that have already been delivered ?
I could delete messages from ACTIVEMQ_MSGS before we restart activemq but
how can I decide between old messages (already delivered) and those that
still need to be delivered ?

Thanks !





--
View this message in context: http://activemq.2283324.n4.nabble.com/Oracle-delivered-Messages-remain-in-ACTIVEMQ-MSGS-tp4729613.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.

Re: Oracle - delivered Messages remain in ACTIVEMQ_MSGS

Posted by Gary Tully <ga...@gmail.com>.
That DELETE statement is for durable topic subscribers. For queues, a
delete is done when the message is acked, does you application acknowledge
receipt of the message after onDelivery or is the session auto ack?

On Thu, 10 Aug 2017 at 23:02 Heike Potter <He...@twinsoft.de> wrote:

> We are running an application on centos with tomcat 8.0.28, java 1.8 and
> activemq 5.13.4
> For persistency we are using Oracle 11.2
>
> We have 3 queues with several producers and consumers.
>
> From time to time we have the effect that messages remain in the
> ACTIVEMQ_MSGS table in the oracle database though these messages are
> consumed correctly (we get correct results).
> These messages never get removed from the ACTIVEQ_MSG table. Our problem
> occurs, when we restart activemq,
> because these messages are redelivered.
>
> We checked the activemq-Logfile, the enqueue/dequeue counters for the
> queues
> are identical.
> What is astonishing us is the following logged DELETE command:
> ---
> 2017-06-09 10:24:58,281 [ Adaptor Task-2] DEBUG JournalPersistenceAdapter
> - Checkpoint started.
> 2017-06-09 10:24:58,291 [ Adaptor Task-2] DEBUG JDBCPersistenceAdapter
> - Cleaning up old messages.
> 2017-06-09 10:24:58,291 [ Adaptor Task-2] DEBUG DefaultJDBCAdapter
> - Executing SQL: DELETE FROM ACTIVEMQ_MSGS WHERE (PRIORITY=? AND ID <=
> ( SELECT min(ACTIVEMQ_ACKS.LAST_ACKED_ID)       FROM ACTIVEMQ_ACKS WHERE
> ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ_MSGS.CONTAINER        AND
> ACTIVEMQ_ACKS.PRIORITY=?)   )
> 2017-06-09 10:24:58,300 [ Adaptor Task-2] DEBUG DefaultJDBCAdapter
> - Deleted 0 old message(s) at priority: 0
> 2017-06-09 10:24:58,300 [ Adaptor Task-2] DEBUG JDBCPersistenceAdapter
> - Cleanup done.
> 2017-06-09 10:24:58,300 [ Adaptor Task-2] DEBUG JournalPersistenceAdapter
> - Checkpoint done.
> ---
> Though we have entries in ACTIVEMQ_MSGS we do not have entries in the
> ACTIVEMQ_ACKS table, so if I understand it correctly the statement above
> can
> never remove any records.
>
> Here is our configuration file:
> ---
>   <broker brokerName="KWG_BROKER"  useJmx="true"
> xmlns="http://activemq.apache.org/schema/core">
>
>     <persistenceFactory>
>         <journalPersistenceAdapterFactory journalLogFiles="5"
> dataDirectory="${activemq.data}/activemq-data" dataSource="#oracle-ds"/>
>     </persistenceFactory>
>
>
>     <transportConnectors>
>         <transportConnector
>
> uri=&quot;tcp://&lt;ourhost>:8016?transport.trace=false&amp;transport.soTimeout=60000&amp;keepAlive=true"/>
>     </transportConnectors>
>
>   </broker>
>
>     <bean id="oracle-ds" class="org.apache.commons.dbcp2.BasicDataSource"
> destroy-method="close">
>      <property name="driverClassName" value="oracle.jdbc.OracleDriver"/>
>      <property name=&quot;url&quot;
> value=&quot;jdbc:oracle:thin:@&lt;ourhost2>:1521/<ourdbname>"/>
>      <property name="username" value="benutzer"/>
>      <property name="password" value="passwort"/>
>      <property name="poolPreparedStatements" value="true"/>
>    </bean>
>
> ---
>
> And a code snippet:
> ----
>     public void onMessage(final Message message, final Session session) {
>         try {
>             _log.debug("OmqListener.On Message Beginn");
>             ObjectMessage om = (ObjectMessage) message;
>             Object obj = om.getObject();
>             if (obj instanceof OutMessage) {
>                 OutMessage ome = (OutMessage) obj;
>                 String messtype = ome.getTyp().toString();
>                 String oid = message.getStringProperty(Constants.OID);
>                 String erstellungsdatum =
> message.getStringProperty(Constants.EINSTELLUNGSDATUM);
>                 _log.debug(new StringBuffer("Message erhalten
> Typus=").append(messtype)
>                         .append("; OID=").append(oid)
>                         .append(";
> Erstellungsdatum=").append(erstellungsdatum)
>
> .append(";Logging=").append(ome.isLogging()).toString());
>                 _ausgangsService.performOutmessage(ome,
> _propholder.isAntwortLoeschenAusDb());
>                 _log.info("onMessage: Anzahl Versuche = " +
> ome.getAnzahlVersuche());
>
>             }
>             _log.debug("OmqListener.On Message Ende");
>         } catch (Throwable e) {
>             //auf erneute Zustellversuche(Timer!) warten
>             _log.error("Fehler!", e);
>         }
>     }
> ---
>
> When we restart activemq, what possibilities do I have to filter out in my
> programme those messages that have already been delivered ?
> I could delete messages from ACTIVEMQ_MSGS before we restart activemq but
> how can I decide between old messages (already delivered) and those that
> still need to be delivered ?
>
> Thanks !
>
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Oracle-delivered-Messages-remain-in-ACTIVEMQ-MSGS-tp4729613.html
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>