You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/09/05 16:10:37 UTC
svn commit: r692449 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin: ./
DiscardingDLQBroker.java DiscardingDLQBrokerPlugin.java
Author: rajdavies
Date: Fri Sep 5 07:10:36 2008
New Revision: 692449
URL: http://svn.apache.org/viewvc?rev=692449&view=rev
Log:
Patch applied for https://issues.apache.org/activemq/browse/AMQ-1892
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java (with props)
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java?rev=692449&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java Fri Sep 5 07:10:36 2008
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.activemq.plugin;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class DiscardingDLQBroker extends BrokerFilter {
+ public static Log log = LogFactory.getLog(DiscardingDLQBroker.class);
+ private boolean dropTemporaryTopics = true;
+ private boolean dropTemporaryQueues = true;
+ private boolean dropAll = true;
+ private Pattern[] destFilter;
+ private int reportInterval = 1000;
+ private long dropCount = 0;
+
+ public DiscardingDLQBroker(Broker next) {
+ super(next);
+ }
+
+ @Override
+ public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef) {
+ if (log.isTraceEnabled()) {
+ try {
+ log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null));
+ } catch (IOException x) {
+ log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + msgRef != null ? msgRef : null, x);
+ }
+ }
+ boolean dropped = true;
+ Message msg = null;
+ ActiveMQDestination dest = null;
+ String destName = null;
+ try {
+ msg = msgRef.getMessage();
+ dest = msg.getDestination();
+ destName = dest.getPhysicalName();
+ }catch (IOException x) {
+ if (log.isDebugEnabled()) {
+ log.debug("Unable to retrieve message or destination for message going to Dead Letter Queue. message skipped.", x);
+ }
+ }
+
+ if (dest == null || destName == null ) {
+ //do nothing, no need to forward it
+ skipMessage("NULL DESTINATION",msgRef);
+ } else if (dropAll) {
+ //do nothing
+ skipMessage("dropAll",msgRef);
+ } else if (dropTemporaryTopics && dest.isTemporary() && dest.isTopic()) {
+ //do nothing
+ skipMessage("dropTemporaryTopics",msgRef);
+ } else if (dropTemporaryQueues && dest.isTemporary() && dest.isQueue()) {
+ //do nothing
+ skipMessage("dropTemporaryQueues",msgRef);
+ } else if (destFilter!=null && matches(destName)) {
+ //do nothing
+ skipMessage("dropOnly",msgRef);
+ } else {
+ dropped = false;
+ next.sendToDeadLetterQueue(ctx, msgRef);
+ }
+ if (dropped && getReportInterval()>0) {
+ if ((++dropCount)%getReportInterval() == 0 ) {
+ log.info("Total of "+dropCount+" messages were discarded, since their destination was the dead letter queue");
+ }
+ }
+ }
+
+ public boolean matches(String destName) {
+ for (int i=0; destFilter!=null && i<destFilter.length; i++) {
+ if (destFilter[i]!=null && destFilter[i].matcher(destName).matches()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void skipMessage(String prefix, MessageReference msgRef) {
+ if (log.isDebugEnabled()) {
+ try {
+ String lmsg = "Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef.getMessage():null);
+ log.debug(lmsg);
+ }catch (IOException x) {
+ log.debug("Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef:null),x);
+ }
+ }
+ }
+
+ public void setDropTemporaryTopics(boolean dropTemporaryTopics) {
+ this.dropTemporaryTopics = dropTemporaryTopics;
+ }
+
+ public void setDropTemporaryQueues(boolean dropTemporaryQueues) {
+ this.dropTemporaryQueues = dropTemporaryQueues;
+ }
+
+ public void setDropAll(boolean dropAll) {
+ this.dropAll = dropAll;
+ }
+
+ public void setDestFilter(Pattern[] destFilter) {
+ this.destFilter = destFilter;
+ }
+
+ public void setReportInterval(int reportInterval) {
+ this.reportInterval = reportInterval;
+ }
+
+ public boolean isDropTemporaryTopics() {
+ return dropTemporaryTopics;
+ }
+
+ public boolean isDropTemporaryQueues() {
+ return dropTemporaryQueues;
+ }
+
+ public boolean isDropAll() {
+ return dropAll;
+ }
+
+ public Pattern[] getDestFilter() {
+ return destFilter;
+ }
+
+ public int getReportInterval() {
+ return reportInterval;
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java?rev=692449&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java Fri Sep 5 07:10:36 2008
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+import java.util.regex.Pattern;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class DiscardingDLQBrokerPlugin implements BrokerPlugin {
+ public DiscardingDLQBrokerPlugin() {
+ }
+
+ public static Log log = LogFactory.getLog(DiscardingDLQBrokerPlugin.class);
+ private boolean dropTemporaryTopics = true;
+ private boolean dropTemporaryQueues = true;
+ private boolean dropAll = true;
+ private String dropOnly;
+ private int reportInterval = 1000;
+
+ /**
+ * Installs the plugin into the interceptor chain of the broker, returning the new intercepted broker to use.
+ * @param broker Broker
+ * @throws Exception
+ * @return Broker
+ * @todo Implement this org.apache.activemq.broker.BrokerPlugin method
+ */
+ public Broker installPlugin(Broker broker) throws Exception {
+ log.info("Installing Discarding Dead Letter Queue broker plugin[dropAll="+isDropAll()+
+ "; dropTemporaryTopics="+isDropTemporaryTopics()+"; dropTemporaryQueues="+
+ isDropTemporaryQueues()+"; dropOnly="+getDropOnly()+"; reportInterval="+
+ getReportInterval()+"]");
+ DiscardingDLQBroker cb = new DiscardingDLQBroker(broker);
+ cb.setDropAll(isDropAll());
+ cb.setDropTemporaryQueues(isDropTemporaryQueues());
+ cb.setDropTemporaryTopics(isDropTemporaryTopics());
+ cb.setDestFilter(getDestFilter());
+ return cb;
+ }
+
+ public boolean isDropAll() {
+ return dropAll;
+ }
+
+ public boolean isDropTemporaryQueues() {
+ return dropTemporaryQueues;
+ }
+
+ public boolean isDropTemporaryTopics() {
+ return dropTemporaryTopics;
+ }
+
+ public String getDropOnly() {
+ return dropOnly;
+ }
+
+ public int getReportInterval() {
+ return reportInterval;
+ }
+
+ public void setDropTemporaryTopics(boolean dropTemporaryTopics) {
+ this.dropTemporaryTopics = dropTemporaryTopics;
+ }
+
+ public void setDropTemporaryQueues(boolean dropTemporaryQueues) {
+ this.dropTemporaryQueues = dropTemporaryQueues;
+ }
+
+ public void setDropAll(boolean dropAll) {
+ this.dropAll = dropAll;
+ }
+
+ public void setDropOnly(String dropOnly) {
+ this.dropOnly = dropOnly;
+ }
+
+ public void setReportInterval(int reportInterval) {
+ this.reportInterval = reportInterval;
+ }
+
+ public Pattern[] getDestFilter() {
+ if (getDropOnly()==null) return null;
+ ArrayList<Pattern> list = new ArrayList<Pattern>();
+ StringTokenizer t = new StringTokenizer(getDropOnly()," ");
+ while (t.hasMoreTokens()) {
+ String s = t.nextToken();
+ if (s!=null && s.trim().length()>0) list.add(Pattern.compile(s));
+ }
+ if (list.size()==0) return null;
+ return list.toArray(new Pattern[0]);
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java
------------------------------------------------------------------------------
svn:eol-style = native