You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/09/05 16:10:37 UTC

svn commit: r692449 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin: ./ DiscardingDLQBroker.java DiscardingDLQBrokerPlugin.java

Author: rajdavies
Date: Fri Sep  5 07:10:36 2008
New Revision: 692449

URL: http://svn.apache.org/viewvc?rev=692449&view=rev
Log:
Patch applied for https://issues.apache.org/activemq/browse/AMQ-1892

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java   (with props)

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java?rev=692449&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java Fri Sep  5 07:10:36 2008
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.activemq.plugin;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class DiscardingDLQBroker extends BrokerFilter {
+    public static Log log = LogFactory.getLog(DiscardingDLQBroker.class);
+    private boolean dropTemporaryTopics = true;
+    private boolean dropTemporaryQueues = true;
+    private boolean dropAll = true;
+    private Pattern[] destFilter;
+    private int reportInterval = 1000;
+    private long dropCount = 0;
+
+    public DiscardingDLQBroker(Broker next) {
+        super(next);
+    }
+
+    @Override
+    public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef) {
+        if (log.isTraceEnabled()) {
+            try {
+                log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null));
+            } catch (IOException x) {
+                log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + msgRef != null ? msgRef : null, x);
+            }
+        }
+        boolean dropped = true;
+        Message msg = null;
+        ActiveMQDestination dest = null;
+        String destName = null;
+        try {
+            msg = msgRef.getMessage();
+            dest = msg.getDestination();
+            destName = dest.getPhysicalName();
+        }catch (IOException x) {
+            if (log.isDebugEnabled()) {
+                log.debug("Unable to retrieve message or destination for message going to Dead Letter Queue. message skipped.", x);
+            }
+        }
+
+        if (dest == null || destName == null ) {
+            //do nothing, no need to forward it
+            skipMessage("NULL DESTINATION",msgRef);
+        } else if (dropAll) {
+            //do nothing
+            skipMessage("dropAll",msgRef);
+        } else if (dropTemporaryTopics && dest.isTemporary() && dest.isTopic()) {
+            //do nothing
+            skipMessage("dropTemporaryTopics",msgRef);
+        } else if (dropTemporaryQueues && dest.isTemporary() && dest.isQueue()) {
+            //do nothing
+            skipMessage("dropTemporaryQueues",msgRef);
+        } else if (destFilter!=null && matches(destName)) {
+            //do nothing
+            skipMessage("dropOnly",msgRef);
+        } else {
+            dropped = false;
+            next.sendToDeadLetterQueue(ctx, msgRef);
+        }
+        if (dropped && getReportInterval()>0) {
+            if ((++dropCount)%getReportInterval() == 0 ) {
+                log.info("Total of "+dropCount+" messages were discarded, since their destination was the dead letter queue");
+            }
+        }
+    }
+
+    public boolean matches(String destName) {
+        for (int i=0; destFilter!=null && i<destFilter.length; i++) {
+            if (destFilter[i]!=null && destFilter[i].matcher(destName).matches()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void skipMessage(String prefix, MessageReference msgRef) {
+        if (log.isDebugEnabled()) {
+            try {
+                String lmsg = "Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef.getMessage():null);
+                log.debug(lmsg);
+            }catch (IOException x) {
+                log.debug("Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef:null),x);
+            }
+        }
+    }
+
+    public void setDropTemporaryTopics(boolean dropTemporaryTopics) {
+        this.dropTemporaryTopics = dropTemporaryTopics;
+    }
+
+    public void setDropTemporaryQueues(boolean dropTemporaryQueues) {
+        this.dropTemporaryQueues = dropTemporaryQueues;
+    }
+
+    public void setDropAll(boolean dropAll) {
+        this.dropAll = dropAll;
+    }
+
+    public void setDestFilter(Pattern[] destFilter) {
+        this.destFilter = destFilter;
+    }
+
+    public void setReportInterval(int reportInterval) {
+        this.reportInterval = reportInterval;
+    }
+
+    public boolean isDropTemporaryTopics() {
+        return dropTemporaryTopics;
+    }
+
+    public boolean isDropTemporaryQueues() {
+        return dropTemporaryQueues;
+    }
+
+    public boolean isDropAll() {
+        return dropAll;
+    }
+
+    public Pattern[] getDestFilter() {
+        return destFilter;
+    }
+
+    public int getReportInterval() {
+        return reportInterval;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java?rev=692449&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java Fri Sep  5 07:10:36 2008
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+import java.util.regex.Pattern;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class DiscardingDLQBrokerPlugin implements BrokerPlugin {
+    public DiscardingDLQBrokerPlugin() {
+    }
+
+    public static Log log = LogFactory.getLog(DiscardingDLQBrokerPlugin.class);
+    private boolean dropTemporaryTopics = true;
+    private boolean dropTemporaryQueues = true;
+    private boolean dropAll = true;
+    private String dropOnly;
+    private int reportInterval = 1000;
+
+    /**
+     * Installs the plugin into the interceptor chain of the broker, returning the new intercepted broker to use.
+     * @param broker Broker
+     * @throws Exception
+     * @return Broker
+     * @todo Implement this org.apache.activemq.broker.BrokerPlugin method
+     */
+    public Broker installPlugin(Broker broker) throws Exception {
+        log.info("Installing Discarding Dead Letter Queue broker plugin[dropAll="+isDropAll()+
+                 "; dropTemporaryTopics="+isDropTemporaryTopics()+"; dropTemporaryQueues="+
+                 isDropTemporaryQueues()+"; dropOnly="+getDropOnly()+"; reportInterval="+
+                 getReportInterval()+"]");
+        DiscardingDLQBroker cb = new DiscardingDLQBroker(broker);
+        cb.setDropAll(isDropAll());
+        cb.setDropTemporaryQueues(isDropTemporaryQueues());
+        cb.setDropTemporaryTopics(isDropTemporaryTopics());
+        cb.setDestFilter(getDestFilter());
+        return cb;
+    }
+
+    public boolean isDropAll() {
+        return dropAll;
+    }
+
+    public boolean isDropTemporaryQueues() {
+        return dropTemporaryQueues;
+    }
+
+    public boolean isDropTemporaryTopics() {
+        return dropTemporaryTopics;
+    }
+
+    public String getDropOnly() {
+        return dropOnly;
+    }
+
+    public int getReportInterval() {
+        return reportInterval;
+    }
+
+    public void setDropTemporaryTopics(boolean dropTemporaryTopics) {
+        this.dropTemporaryTopics = dropTemporaryTopics;
+    }
+
+    public void setDropTemporaryQueues(boolean dropTemporaryQueues) {
+        this.dropTemporaryQueues = dropTemporaryQueues;
+    }
+
+    public void setDropAll(boolean dropAll) {
+        this.dropAll = dropAll;
+    }
+
+    public void setDropOnly(String dropOnly) {
+        this.dropOnly = dropOnly;
+    }
+
+    public void setReportInterval(int reportInterval) {
+        this.reportInterval = reportInterval;
+    }
+
+    public Pattern[] getDestFilter() {
+        if (getDropOnly()==null) return null;
+        ArrayList<Pattern> list = new ArrayList<Pattern>();
+        StringTokenizer t = new StringTokenizer(getDropOnly()," ");
+        while (t.hasMoreTokens()) {
+            String s = t.nextToken();
+            if (s!=null && s.trim().length()>0) list.add(Pattern.compile(s));
+        }
+        if (list.size()==0) return null;
+        return list.toArray(new Pattern[0]);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java
------------------------------------------------------------------------------
    svn:eol-style = native