You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/12/13 22:43:55 UTC
svn commit: r1421557 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/
activemq-broker/src/main/java/org/apache/activemq/broker/jmx/
activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/
activemq-broker/src/m...
Author: tabish
Date: Thu Dec 13 21:43:50 2012
New Revision: 1421557
URL: http://svn.apache.org/viewvc?rev=1421557&view=rev
Log:
apply fix for: https://issues.apache.org/jira/browse/AMQ-4068
Updated patch for new layout and fixed size returned to only include the dynamic journal size and not the total disk usage since that accounts for the full log file size and not just the used portion.
Added:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/JobSchedulerUsage.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java (with props)
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java
activemq/trunk/activemq-core/ (props changed)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Dec 13 21:43:50 2012
@@ -16,18 +16,70 @@
*/
package org.apache.activemq.broker;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
-import org.apache.activemq.broker.jmx.*;
-import org.apache.activemq.broker.region.*;
+import org.apache.activemq.broker.jmx.AnnotatedMBean;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.jmx.ConnectorView;
+import org.apache.activemq.broker.jmx.ConnectorViewMBean;
+import org.apache.activemq.broker.jmx.JmsConnectorView;
+import org.apache.activemq.broker.jmx.JobSchedulerView;
+import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
+import org.apache.activemq.broker.jmx.ManagedRegionBroker;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.broker.jmx.NetworkConnectorView;
+import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
+import org.apache.activemq.broker.jmx.ProxyConnectorView;
+import org.apache.activemq.broker.jmx.StatusView;
+import org.apache.activemq.broker.jmx.StatusViewMBean;
+import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFactory;
+import org.apache.activemq.broker.region.DestinationFactoryImpl;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.broker.scheduler.SchedulerBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -51,23 +103,21 @@ import org.apache.activemq.transport.Tra
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.*;
+import org.apache.activemq.util.BrokerSupport;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOExceptionHandler;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.InetAddressUtil;
+import org.apache.activemq.util.JMXSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ThreadPoolUtils;
+import org.apache.activemq.util.TimeUtils;
+import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.io.*;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
* number of transport connectors, network connectors and a bunch of properties
@@ -125,8 +175,7 @@ public class BrokerService implements Se
private String[] transportConnectorURIs;
private String[] networkConnectorURIs;
private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
- // to other jms messaging
- // systems
+ // to other jms messaging systems
private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true;
private URI vmConnectorURI;
@@ -173,6 +222,7 @@ public class BrokerService implements Se
private BrokerContext brokerContext;
private boolean networkConnectorStartAsync = false;
private boolean allowTempAutoCreationOnSend;
+ private JobSchedulerStore jobSchedulerStore;
private int offlineDurableSubscriberTimeout = -1;
private int offlineDurableSubscriberTaskSchedule = 300000;
@@ -332,6 +382,7 @@ public class BrokerService implements Se
// Set a connection filter so that the connector does not establish loop
// back connections.
connector.setConnectionFilter(new ConnectionFilter() {
+ @Override
public boolean connectTo(URI location) {
List<TransportConnector> transportConnectors = getTransportConnectors();
for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
@@ -463,6 +514,7 @@ public class BrokerService implements Se
}
}
+ @Override
public void start() throws Exception {
if (stopped.get() || !started.compareAndSet(false, true)) {
// lets just ignore redundant start() calls
@@ -617,6 +669,7 @@ public class BrokerService implements Se
* @throws Exception
* @org.apache .xbean.DestroyMethod
*/
+ @Override
@PreDestroy
public void stop() throws Exception {
if (!stopping.compareAndSet(false, true)) {
@@ -662,6 +715,10 @@ public class BrokerService implements Se
broker = null;
}
+ if (jobSchedulerStore != null) {
+ jobSchedulerStore.stop();
+ jobSchedulerStore = null;
+ }
if (tempDataStore != null) {
tempDataStore.stop();
tempDataStore = null;
@@ -961,11 +1018,13 @@ public class BrokerService implements Se
public SystemUsage getSystemUsage() {
try {
if (systemUsage == null) {
- systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
+
+ systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
systemUsage.setExecutor(getExecutor());
systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // 64 MB
systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
+ systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1000 * 50); // 50 // Gb
addService(this.systemUsage);
}
return systemUsage;
@@ -1714,6 +1773,36 @@ public class BrokerService implements Se
this.useTempMirroredQueues = useTempMirroredQueues;
}
+ public synchronized JobSchedulerStore getJobSchedulerStore() {
+ if (jobSchedulerStore == null && isSchedulerSupport()) {
+ try {
+ String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
+ jobSchedulerStore = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
+ jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
+ configureService(jobSchedulerStore);
+ jobSchedulerStore.start();
+ LOG.info("JobScheduler using directory: " + getSchedulerDirectoryFile());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ return jobSchedulerStore;
+ }
+
+ public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
+ this.jobSchedulerStore = jobSchedulerStore;
+ configureService(jobSchedulerStore);
+ try {
+ jobSchedulerStore.start();
+ } catch (Exception e) {
+ RuntimeException exception = new RuntimeException(
+ "Failed to start provided job scheduler store: " + jobSchedulerStore, e);
+ LOG.error(exception.getLocalizedMessage(), e);
+ throw exception;
+ }
+ }
+
//
// Implementation methods
// -------------------------------------------------------------------------
@@ -1829,6 +1918,29 @@ public class BrokerService implements Se
}
}
}
+
+ if (getJobSchedulerStore() != null) {
+ JobSchedulerStore scheduler = getJobSchedulerStore();
+ File schedulerDir = scheduler.getDirectory();
+ if (schedulerDir != null) {
+
+ String schedulerDirPath = schedulerDir.getAbsolutePath();
+ if (!schedulerDir.isAbsolute()) {
+ schedulerDir = new File(schedulerDirPath);
+ }
+
+ while (schedulerDir != null && schedulerDir.isDirectory() == false) {
+ schedulerDir = schedulerDir.getParentFile();
+ }
+ long schedularLimit = usage.getJobSchedulerUsage().getLimit();
+ long dirFreeSpace = schedulerDir.getUsableSpace();
+ if (schedularLimit > dirFreeSpace) {
+ LOG.warn("Job Schedular Store limit is " + schedularLimit / (1024 * 1024) +
+ " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
+ " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
+ }
+ }
+ }
}
public void stopAllConnectors(ServiceStopper stopper) {
@@ -2056,7 +2168,7 @@ public class BrokerService implements Se
*/
protected Broker addInterceptors(Broker broker) throws Exception {
if (isSchedulerSupport()) {
- SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile());
+ SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
if (isUseJmx()) {
JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
try {
@@ -2283,6 +2395,7 @@ public class BrokerService implements Se
10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactory() {
int count=0;
+ @Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
thread.setDaemon(true);
@@ -2301,6 +2414,7 @@ public class BrokerService implements Se
}
if (networkConnectorStartExecutor != null) {
networkConnectorStartExecutor.execute(new Runnable() {
+ @Override
public void run() {
try {
LOG.info("Async start of " + connector);
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Thu Dec 13 21:43:50 2012
@@ -157,6 +157,14 @@ public class BrokerView implements Broke
public int getTempPercentUsage() {
return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
}
+
+ public long getJobSchedulerStoreLimit() {
+ return brokerService.getSystemUsage().getJobSchedulerUsage().getLimit();
+ }
+
+ public int getJobSchedulerStorePercentUsage() {
+ return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage();
+ }
public void setStoreLimit(long limit) {
brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
@@ -165,6 +173,10 @@ public class BrokerView implements Broke
public void setTempLimit(long limit) {
brokerService.getSystemUsage().getTempUsage().setLimit(limit);
}
+
+ public void setJobSchedulerStoreLimit(long limit) {
+ brokerService.getSystemUsage().getJobSchedulerUsage().setLimit(limit);
+ }
public void resetStatistics() {
safeGetBroker().getDestinationStatistics().reset();
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Thu Dec 13 21:43:50 2012
@@ -107,10 +107,18 @@ public interface BrokerViewMBean extends
@MBeanInfo("Percent of temp limit used.")
int getTempPercentUsage();
- @MBeanInfo("Disk limit, in bytes, used for non-persistent messages and temporary date before producers are blocked.")
+ @MBeanInfo("Disk limit, in bytes, used for non-persistent messages and temporary data before producers are blocked.")
long getTempLimit();
void setTempLimit(@MBeanInfo("bytes") long limit);
+
+ @MBeanInfo("Percent of job store limit used.")
+ int getJobSchedulerStorePercentUsage();
+
+ @MBeanInfo("Disk limit, in bytes, used for scheduled messages before producers are blocked.")
+ long getJobSchedulerStoreLimit();
+
+ void setJobSchedulerStoreLimit(@MBeanInfo("bytes") long limit);
@MBeanInfo("Messages are synchronized to disk.")
boolean isPersistent();
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Thu Dec 13 21:43:50 2012
@@ -17,12 +17,14 @@
package org.apache.activemq.broker.scheduler;
import java.io.File;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
@@ -33,13 +35,15 @@ import org.apache.activemq.command.Produ
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.usage.JobSchedulerUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.TypeConversionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.activemq.util.ByteSequence;
public class SchedulerBroker extends BrokerFilter implements JobListener {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class);
@@ -50,38 +54,25 @@ public class SchedulerBroker extends Bro
private final ConnectionContext context = new ConnectionContext();
private final ProducerId producerId = new ProducerId();
private File directory;
+ private final SystemUsage systemUsage;
- private JobSchedulerStore store;
+ private final JobSchedulerStore store;
private JobScheduler scheduler;
- public SchedulerBroker(Broker next, File directory) throws Exception {
+ public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
super(next);
- this.directory = directory;
+
+ this.store = store;
this.producerId.setConnectionId(ID_GENERATOR.generateId());
this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
- context.setBroker(next);
- LOG.info("Scheduler using directory: " + directory);
-
+ this.context.setBroker(next);
+ this.systemUsage = brokerService.getSystemUsage();
}
public synchronized JobScheduler getJobScheduler() throws Exception {
return new JobSchedulerFacade(this);
}
- /**
- * @return the directory
- */
- public File getDirectory() {
- return this.directory;
- }
- /**
- * @param directory
- * the directory to set
- */
- public void setDirectory(File directory) {
- this.directory = directory;
- }
-
@Override
public void start() throws Exception {
this.started.set(true);
@@ -116,9 +107,8 @@ public class SchedulerBroker extends Bro
Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
String physicalName = messageSend.getDestination().getPhysicalName();
- boolean schedularManage = physicalName.regionMatches(true, 0,
- ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
- ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
+ boolean schedularManage = physicalName.regionMatches(true, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
+ ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
if (schedularManage == true) {
@@ -127,14 +117,14 @@ public class SchedulerBroker extends Bro
String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
- if (action != null ) {
+ if (action != null) {
Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
- if( startTime != null && endTime != null ) {
+ if (startTime != null && endTime != null) {
long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
@@ -152,7 +142,7 @@ public class SchedulerBroker extends Bro
scheduler.remove(jobId);
} else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
- if( startTime != null && endTime != null ) {
+ if (startTime != null && endTime != null) {
long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
@@ -165,7 +155,7 @@ public class SchedulerBroker extends Bro
}
} else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
- //clear transaction context
+ // clear transaction context
Message msg = messageSend.copy();
msg.setTransactionId(null);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
@@ -173,7 +163,7 @@ public class SchedulerBroker extends Bro
cronEntry = cronValue.toString();
}
if (periodValue != null) {
- period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
+ period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
}
if (delayValue != null) {
delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
@@ -182,17 +172,17 @@ public class SchedulerBroker extends Bro
if (repeatValue != null) {
repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
}
- getInternalScheduler().schedule(msg.getMessageId().toString(),
- new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat);
+ getInternalScheduler().schedule(msg.getMessageId().toString(), new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay,
+ period, repeat);
} else {
super.send(producerExchange, messageSend);
}
}
+ @Override
public void scheduledJob(String id, ByteSequence job) {
- org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job
- .getOffset(), job.getLength());
+ org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());
try {
Message messageSend = (Message) this.wireFormat.unmarshal(packet);
messageSend.setOriginalTransactionId(null);
@@ -204,11 +194,36 @@ public class SchedulerBroker extends Bro
repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
}
+ // Check for room in the job scheduler store
+ if (systemUsage.getJobSchedulerUsage() != null) {
+ JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
+ if (usage.isFull()) {
+ final String logMessage = "Job Scheduler Store is Full (" +
+ usage.getPercentUsage() + "% of " + usage.getLimit() +
+ "). Stopping producer (" + messageSend.getProducerId() +
+ ") to prevent flooding of the job scheduler store." +
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
+
+ long start = System.currentTimeMillis();
+ long nextWarn = start;
+ while (!usage.waitForSpace(1000)) {
+ if (context.getStopping().get()) {
+ throw new IOException("Connection closed, send aborted.");
+ }
+
+ long now = System.currentTimeMillis();
+ if (now >= nextWarn) {
+ LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
+ nextWarn = now + 30000l;
+ }
+ }
+ }
+ }
+
if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
// create a unique id - the original message could be sent
// lots of times
- messageSend.setMessageId(
- new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
+ messageSend.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
}
// Add the jobId as a property
@@ -233,7 +248,7 @@ public class SchedulerBroker extends Bro
long expiration = timeToLive + newTimeStamp;
- if(expiration > oldExpiration) {
+ if (expiration > oldExpiration) {
if (timeToLive > 0 && expiration > 0) {
messageSend.setExpiration(expiration);
}
@@ -257,7 +272,7 @@ public class SchedulerBroker extends Bro
protected synchronized JobScheduler getInternalScheduler() throws Exception {
if (this.started.get()) {
if (this.scheduler == null) {
- this.scheduler = getStore().getJobScheduler("JMS");
+ this.scheduler = store.getJobScheduler("JMS");
this.scheduler.addListener(this);
}
return this.scheduler;
@@ -265,21 +280,7 @@ public class SchedulerBroker extends Bro
return null;
}
- private JobSchedulerStore getStore() throws Exception {
- if (started.get()) {
- if (this.store == null) {
- String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
- this.store = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
- this.store.setDirectory(directory);
- this.store.start();
- }
- return this.store;
- }
- return null;
- }
-
- protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo)
- throws Exception {
+ protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo) throws Exception {
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
try {
Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/JobSchedulerUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/JobSchedulerUsage.java?rev=1421557&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/JobSchedulerUsage.java (added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/JobSchedulerUsage.java Thu Dec 13 21:43:50 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+
+/**
+ * Used to keep track of how much of something is being used so that a
+ * productive working set usage can be controlled. Main use case is manage
+ * memory usage.
+ *
+ * @org.apache.xbean.XBean
+ *
+ */
+public class JobSchedulerUsage extends Usage<JobSchedulerUsage> {
+
+ private JobSchedulerStore store;
+
+ public JobSchedulerUsage() {
+ super(null, null, 1.0f);
+ }
+
+ public JobSchedulerUsage(String name, JobSchedulerStore store) {
+ super(null, name, 1.0f);
+ this.store = store;
+ }
+
+ public JobSchedulerUsage(JobSchedulerUsage parent, String name) {
+ super(parent, name, 1.0f);
+ this.store = parent.store;
+ }
+
+ @Override
+ protected long retrieveUsage() {
+ if (store == null) {
+ return 0;
+ }
+ return store.size();
+ }
+
+ public JobSchedulerStore getStore() {
+ return store;
+ }
+
+ public void setStore(JobSchedulerStore store) {
+ this.store = store;
+ onLimitChange();
+ }
+}
Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/JobSchedulerUsage.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java Thu Dec 13 21:43:50 2012
@@ -19,16 +19,18 @@ package org.apache.activemq.usage;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
+
import org.apache.activemq.Service;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.PersistenceAdapter;
/**
* Holder for Usage instances for memory, store and temp files Main use case is
* manage memory usage.
- *
+ *
* @org.apache.xbean.XBean
- *
+ *
*/
public class SystemUsage implements Service {
@@ -38,6 +40,7 @@ public class SystemUsage implements Serv
private StoreUsage storeUsage;
private TempUsage tempUsage;
private ThreadPoolExecutor executor;
+ private JobSchedulerUsage jobSchedulerUsage;
/**
* True if someone called setSendFailIfNoSpace() on this particular usage
@@ -51,15 +54,16 @@ public class SystemUsage implements Serv
private final List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
public SystemUsage() {
- this("default", null, null);
+ this("default", null, null, null);
}
- public SystemUsage(String name, PersistenceAdapter adapter, PListStore tempStore) {
+ public SystemUsage(String name, PersistenceAdapter adapter, PListStore tempStore, JobSchedulerStore jobSchedulerStore) {
this.parent = null;
this.name = name;
this.memoryUsage = new MemoryUsage(name + ":memory");
this.storeUsage = new StoreUsage(name + ":store", adapter);
this.tempUsage = new TempUsage(name + ":temp", tempStore);
+ this.jobSchedulerUsage = new JobSchedulerUsage(name + ":jobScheduler", jobSchedulerStore);
this.memoryUsage.setExecutor(getExecutor());
this.storeUsage.setExecutor(getExecutor());
this.tempUsage.setExecutor(getExecutor());
@@ -72,6 +76,7 @@ public class SystemUsage implements Serv
this.memoryUsage = new MemoryUsage(parent.memoryUsage, name + ":memory");
this.storeUsage = new StoreUsage(parent.storeUsage, name + ":store");
this.tempUsage = new TempUsage(parent.tempUsage, name + ":temp");
+ this.jobSchedulerUsage = new JobSchedulerUsage(parent.jobSchedulerUsage, name + ":jobScheduler");
this.memoryUsage.setExecutor(getExecutor());
this.storeUsage.setExecutor(getExecutor());
this.tempUsage.setExecutor(getExecutor());
@@ -102,11 +107,19 @@ public class SystemUsage implements Serv
return this.tempUsage;
}
+ /**
+ * @return the schedulerUsage
+ */
+ public JobSchedulerUsage getJobSchedulerUsage() {
+ return this.jobSchedulerUsage;
+ }
+
@Override
public String toString() {
return "UsageManager(" + getName() + ")";
}
+ @Override
public void start() {
if (parent != null) {
parent.addChild(this);
@@ -114,8 +127,10 @@ public class SystemUsage implements Serv
this.memoryUsage.start();
this.storeUsage.start();
this.tempUsage.start();
+ this.jobSchedulerUsage.start();
}
+ @Override
public void stop() {
if (parent != null) {
parent.removeChild(this);
@@ -123,6 +138,7 @@ public class SystemUsage implements Serv
this.memoryUsage.stop();
this.storeUsage.stop();
this.tempUsage.stop();
+ this.jobSchedulerUsage.stop();
}
/**
@@ -185,6 +201,7 @@ public class SystemUsage implements Serv
this.memoryUsage.setName(name + ":memory");
this.storeUsage.setName(name + ":store");
this.tempUsage.setName(name + ":temp");
+ this.jobSchedulerUsage.setName(name + ":jobScheduler");
}
public void setMemoryUsage(MemoryUsage memoryUsage) {
@@ -210,7 +227,6 @@ public class SystemUsage implements Serv
}
this.storeUsage = storeUsage;
this.storeUsage.setExecutor(executor);
-
}
public void setTempUsage(TempUsage tempDiskUsage) {
@@ -227,6 +243,20 @@ public class SystemUsage implements Serv
this.tempUsage.setExecutor(getExecutor());
}
+ public void setJobSchedulerUsage(JobSchedulerUsage jobSchedulerUsage) {
+ if (jobSchedulerUsage.getStore() == null) {
+ jobSchedulerUsage.setStore(this.jobSchedulerUsage.getStore());
+ }
+ if (jobSchedulerUsage.getName() == null) {
+ jobSchedulerUsage.setName(this.jobSchedulerUsage.getName());
+ }
+ if (parent != null) {
+ jobSchedulerUsage.setParent(parent.jobSchedulerUsage);
+ }
+ this.jobSchedulerUsage = jobSchedulerUsage;
+ this.jobSchedulerUsage.setExecutor(getExecutor());
+ }
+
/**
* @return the executor
*/
@@ -249,5 +279,8 @@ public class SystemUsage implements Serv
if (this.tempUsage != null) {
this.tempUsage.setExecutor(this.executor);
}
+ if(this.jobSchedulerUsage != null) {
+ this.jobSchedulerUsage.setExecutor(this.executor);
+ }
}
}
Propchange: activemq/trunk/activemq-core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Dec 13 21:43:50 2012
@@ -26,3 +26,4 @@ shared
.idea
*.i??
derby.log
+createData
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java Thu Dec 13 21:43:50 2012
@@ -29,10 +29,14 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ProducerThread;
+import org.apache.activemq.util.Wait;
public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
@@ -212,6 +216,64 @@ public class JmsSchedulerTest extends Em
producer.send(message);
producer.close();
}
+
+ public void testJobSchedulerStoreUsage() throws Exception {
+
+ // Shrink the store limit down so we get the producer to block
+ broker.getSystemUsage().getJobSchedulerUsage().setLimit(10 * 1024);
+
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+ Connection conn = factory.createConnection();
+ conn.start();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final long time = 5000;
+ final ProducerThread producer = new ProducerThread(sess, destination) {
+ @Override
+ protected Message createMessage(int i) throws Exception {
+ Message message = super.createMessage(i);
+ message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+ return message;
+ }
+ };
+ producer.setMessageCount(100);
+ producer.start();
+
+ MessageConsumer consumer = sess.createConsumer(destination);
+ final CountDownLatch latch = new CountDownLatch(100);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ latch.countDown();
+ }
+ });
+
+ // wait for the producer to block, which should happen immediately, and also wait long
+ // enough for the delay to elapse. We should see no deliveries as the send should block
+ // on the first message.
+ Thread.sleep(10000l);
+
+ assertEquals(100, latch.getCount());
+
+ // Increase the store limit so the producer unblocks. Everything should enqueue at this point.
+ broker.getSystemUsage().getJobSchedulerUsage().setLimit(1024 * 1024 * 33);
+
+ // Wait long enough that the messages are enqueued and the delivery delay has elapsed.
+ Thread.sleep(10000l);
+
+ // Make sure we sent all the messages we expected to send
+ Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return producer.getSentCount() == producer.getMessageCount();
+ }
+ }, 20000l);
+
+ assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
+
+ // Make sure we got all the messages we expected to get
+ latch.await(20000l, TimeUnit.MILLISECONDS);
+
+ assertEquals("Consumer did not receive all messages.", 0, latch.getCount());
+ }
@Override
protected void setUp() throws Exception {
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java?rev=1421557&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java Thu Dec 13 21:43:50 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usage;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ProducerThread;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreUsageTest.class);
+
+ final int WAIT_TIME_MILLS = 20*1000;
+
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ File schedulerDirectory = new File("target/scheduler");
+
+ IOHelper.mkdirs(schedulerDirectory);
+ IOHelper.deleteChildren(schedulerDirectory);
+
+ BrokerService broker = super.createBroker();
+ broker.setSchedulerSupport(true);
+ broker.setSchedulerDirectoryFile(schedulerDirectory);
+ broker.getSystemUsage().getJobSchedulerUsage().setLimit(7 * 1024);
+ broker.deleteAllMessages();
+ return broker;
+ }
+
+ @Override
+ protected boolean isPersistent() {
+ return true;
+ }
+
+ public void testJmx() throws Exception {
+
+ LOG.info("Initial scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+ Connection conn = factory.createConnection();
+ conn.start();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination dest = sess.createQueue(this.getClass().getName());
+ final ProducerThread producer = new ProducerThread(sess, dest) {
+ @Override
+ protected Message createMessage(int i) throws Exception {
+ Message message = super.createMessage(i);
+ message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, WAIT_TIME_MILLS / 2);
+ return message;
+ }
+ };
+ producer.setMessageCount(100);
+ producer.start();
+
+ assertEquals(7 * 1024, broker.getAdminView().getJobSchedulerStoreLimit());
+
+ // wait for the producer to block
+ Thread.sleep(WAIT_TIME_MILLS / 2);
+
+ assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() > 100);
+
+ broker.getAdminView().setJobSchedulerStoreLimit(1024 * 1024 * 33);
+
+ Thread.sleep(WAIT_TIME_MILLS);
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return producer.getSentCount() == producer.getMessageCount();
+ }
+ }, WAIT_TIME_MILLS * 2);
+
+ assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
+
+ LOG.info("Final scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
+
+ assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() < 100);
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Dec 13 21:43:50 2012
@@ -89,7 +89,6 @@ import org.apache.activemq.util.Callback
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.LockFile;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
@@ -218,7 +217,6 @@ public abstract class MessageDatabase ex
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
protected AtomicBoolean opened = new AtomicBoolean();
- private LockFile lockFile;
private boolean ignoreMissingJournalfiles = false;
private int indexCacheSize = 10000;
private boolean checkForCorruptJournalFiles = false;
Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java Thu Dec 13 21:43:50 2012
@@ -16,38 +16,39 @@
*/
package org.apache.activemq.store.kahadb.scheduler;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Page;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
-import org.apache.activemq.util.LockFile;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.LockFile;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore {
static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -58,6 +59,7 @@ public class JobSchedulerStoreImpl exten
private File directory;
PageFile pageFile;
private Journal journal;
+ protected AtomicLong journalSize = new AtomicLong(0);
private LockFile lockFile;
private boolean failIfDatabaseIsLocked;
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
@@ -72,6 +74,7 @@ public class JobSchedulerStoreImpl exten
protected MetaData(JobSchedulerStoreImpl store) {
this.store = store;
}
+
private final JobSchedulerStoreImpl store;
Page<MetaData> page;
BTreeIndex<Integer, Integer> journalRC;
@@ -111,7 +114,6 @@ public class JobSchedulerStoreImpl exten
public void write(DataOutput os) throws IOException {
os.writeLong(this.storedSchedulers.getPageId());
os.writeLong(this.journalRC.getPageId());
-
}
}
@@ -121,18 +123,22 @@ public class JobSchedulerStoreImpl exten
MetaDataMarshaller(JobSchedulerStoreImpl store) {
this.store = store;
}
+
+ @Override
public MetaData readPayload(DataInput dataIn) throws IOException {
MetaData rc = new MetaData(this.store);
rc.read(dataIn);
return rc;
}
+ @Override
public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
object.write(dataOut);
}
}
class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
+ @Override
public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
List<JobLocation> result = new ArrayList<JobLocation>();
int size = dataIn.readInt();
@@ -144,6 +150,7 @@ public class JobSchedulerStoreImpl exten
return result;
}
+ @Override
public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
dataOut.writeInt(value.size());
for (JobLocation jobLocation : value) {
@@ -154,15 +161,19 @@ public class JobSchedulerStoreImpl exten
class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
private final JobSchedulerStoreImpl store;
+
JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
this.store = store;
}
+
+ @Override
public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
JobSchedulerImpl result = new JobSchedulerImpl(this.store);
result.read(dataIn);
return result;
}
+ @Override
public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
js.write(dataOut);
}
@@ -177,14 +188,14 @@ public class JobSchedulerStoreImpl exten
public void setDirectory(File directory) {
this.directory = directory;
}
-
+
@Override
public long size() {
- if ( !isStarted() ) {
+ if (!isStarted()) {
return 0;
}
try {
- return journal.getDiskSize() + pageFile.getDiskSize();
+ return journalSize.get() + pageFile.getDiskSize();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -197,6 +208,7 @@ public class JobSchedulerStoreImpl exten
final JobSchedulerImpl js = new JobSchedulerImpl(this);
js.setName(name);
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
public void execute(Transaction tx) throws IOException {
js.createIndexes(tx);
js.load(tx);
@@ -221,6 +233,7 @@ public class JobSchedulerStoreImpl exten
if (result) {
js.stop();
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
public void execute(Transaction tx) throws IOException {
metaData.storedSchedulers.remove(tx, name);
js.destroy(tx);
@@ -241,12 +254,14 @@ public class JobSchedulerStoreImpl exten
this.journal.setDirectory(directory);
this.journal.setMaxFileLength(getJournalMaxFileLength());
this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
+ this.journal.setSizeAccumulator(this.journalSize);
this.journal.start();
this.pageFile = new PageFile(directory, "scheduleDB");
this.pageFile.setWriteBatchSize(1);
this.pageFile.load();
this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ @Override
public void execute(Transaction tx) throws IOException {
if (pageFile.getPageCount() == 0) {
Page<MetaData> page = tx.allocate();
@@ -263,20 +278,20 @@ public class JobSchedulerStoreImpl exten
}
metaData.load(tx);
metaData.loadScheduler(tx, schedulers);
- for (JobSchedulerImpl js :schedulers.values()) {
+ for (JobSchedulerImpl js : schedulers.values()) {
try {
js.start();
} catch (Exception e) {
- JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(),e);
+ JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e);
}
- }
+ }
}
});
this.pageFile.flush();
LOG.info(this + " started");
}
-
+
@Override
protected synchronized void doStop(ServiceStopper stopper) throws Exception {
for (JobSchedulerImpl js : this.schedulers.values()) {
@@ -293,7 +308,6 @@ public class JobSchedulerStoreImpl exten
}
this.lockFile = null;
LOG.info(this + " stopped");
-
}
synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
@@ -301,7 +315,6 @@ public class JobSchedulerStoreImpl exten
Integer val = this.metaData.journalRC.get(tx, logId);
int refCount = val != null ? val.intValue() + 1 : 1;
this.metaData.journalRC.put(tx, logId, refCount);
-
}
synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
@@ -316,7 +329,6 @@ public class JobSchedulerStoreImpl exten
} else {
this.metaData.journalRC.put(tx, logId, refCount);
}
-
}
synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
@@ -341,9 +353,8 @@ public class JobSchedulerStoreImpl exten
lockFile.lock();
break;
} catch (IOException e) {
- LOG.info("Database " + lockFileName + " is locked... waiting "
- + (DATABASE_LOCKED_WAIT_DELAY / 1000)
- + " seconds for the database to be unlocked. Reason: " + e);
+ LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000)
+ + " seconds for the database to be unlocked. Reason: " + e);
try {
Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
} catch (InterruptedException e1) {
@@ -395,5 +406,4 @@ public class JobSchedulerStoreImpl exten
public String toString() {
return "JobSchedulerStore:" + this.directory;
}
-
}
Modified: activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java (original)
+++ activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java Thu Dec 13 21:43:50 2012
@@ -74,6 +74,9 @@ public class XBeanBrokerService extends
if (usage.getTempUsage().getStore() == null) {
usage.getTempUsage().setStore(getTempDataStore());
}
+ if (usage.getJobSchedulerUsage().getStore() == null) {
+ usage.getJobSchedulerUsage().setStore(getJobSchedulerStore());
+ }
}
/**