You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/12/13 22:43:55 UTC

svn commit: r1421557 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/ activemq-broker/src/m...

Author: tabish
Date: Thu Dec 13 21:43:50 2012
New Revision: 1421557

URL: http://svn.apache.org/viewvc?rev=1421557&view=rev
Log:
apply fix for: https://issues.apache.org/jira/browse/AMQ-4068

Updated patch for new layout and fixed size returned to only include the dynamic journal size and not the total disk usage since that accounts for the full log file size and not just the used portion. 

Added:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/JobSchedulerUsage.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java   (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java
    activemq/trunk/activemq-core/   (props changed)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
    activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Dec 13 21:43:50 2012
@@ -16,18 +16,70 @@
  */
 package org.apache.activemq.broker;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.ConfigurationException;
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
-import org.apache.activemq.broker.jmx.*;
-import org.apache.activemq.broker.region.*;
+import org.apache.activemq.broker.jmx.AnnotatedMBean;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.jmx.ConnectorView;
+import org.apache.activemq.broker.jmx.ConnectorViewMBean;
+import org.apache.activemq.broker.jmx.JmsConnectorView;
+import org.apache.activemq.broker.jmx.JobSchedulerView;
+import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
+import org.apache.activemq.broker.jmx.ManagedRegionBroker;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.broker.jmx.NetworkConnectorView;
+import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
+import org.apache.activemq.broker.jmx.ProxyConnectorView;
+import org.apache.activemq.broker.jmx.StatusView;
+import org.apache.activemq.broker.jmx.StatusViewMBean;
+import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFactory;
+import org.apache.activemq.broker.region.DestinationFactoryImpl;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.virtual.MirroredQueue;
 import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
 import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.broker.scheduler.SchedulerBroker;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -51,23 +103,21 @@ import org.apache.activemq.transport.Tra
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.vm.VMTransportFactory;
 import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.*;
+import org.apache.activemq.util.BrokerSupport;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOExceptionHandler;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.InetAddressUtil;
+import org.apache.activemq.util.JMXSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ThreadPoolUtils;
+import org.apache.activemq.util.TimeUtils;
+import org.apache.activemq.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.io.*;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
  * number of transport connectors, network connectors and a bunch of properties
@@ -125,8 +175,7 @@ public class BrokerService implements Se
     private String[] transportConnectorURIs;
     private String[] networkConnectorURIs;
     private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
-    // to other jms messaging
-    // systems
+    // to other jms messaging systems
     private boolean deleteAllMessagesOnStartup;
     private boolean advisorySupport = true;
     private URI vmConnectorURI;
@@ -173,6 +222,7 @@ public class BrokerService implements Se
     private BrokerContext brokerContext;
     private boolean networkConnectorStartAsync = false;
     private boolean allowTempAutoCreationOnSend;
+    private JobSchedulerStore jobSchedulerStore;
 
     private int offlineDurableSubscriberTimeout = -1;
     private int offlineDurableSubscriberTaskSchedule = 300000;
@@ -332,6 +382,7 @@ public class BrokerService implements Se
         // Set a connection filter so that the connector does not establish loop
         // back connections.
         connector.setConnectionFilter(new ConnectionFilter() {
+            @Override
             public boolean connectTo(URI location) {
                 List<TransportConnector> transportConnectors = getTransportConnectors();
                 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
@@ -463,6 +514,7 @@ public class BrokerService implements Se
         }
     }
 
+    @Override
     public void start() throws Exception {
         if (stopped.get() || !started.compareAndSet(false, true)) {
             // lets just ignore redundant start() calls
@@ -617,6 +669,7 @@ public class BrokerService implements Se
      * @throws Exception
      * @org.apache .xbean.DestroyMethod
      */
+    @Override
     @PreDestroy
     public void stop() throws Exception {
         if (!stopping.compareAndSet(false, true)) {
@@ -662,6 +715,10 @@ public class BrokerService implements Se
             broker = null;
         }
 
+        if (jobSchedulerStore != null) {
+            jobSchedulerStore.stop();
+            jobSchedulerStore = null;
+        }
         if (tempDataStore != null) {
             tempDataStore.stop();
             tempDataStore = null;
@@ -961,11 +1018,13 @@ public class BrokerService implements Se
     public SystemUsage getSystemUsage() {
         try {
             if (systemUsage == null) {
-                systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
+
+                systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
                 systemUsage.setExecutor(getExecutor());
                 systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // 64 MB
                 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
                 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
+                systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1000 * 50); // 50 // Gb
                 addService(this.systemUsage);
             }
             return systemUsage;
@@ -1714,6 +1773,36 @@ public class BrokerService implements Se
         this.useTempMirroredQueues = useTempMirroredQueues;
     }
 
+    public synchronized JobSchedulerStore getJobSchedulerStore() {
+        if (jobSchedulerStore == null && isSchedulerSupport()) {
+            try {
+                String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
+                jobSchedulerStore = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
+                jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
+                configureService(jobSchedulerStore);
+                jobSchedulerStore.start();
+                LOG.info("JobScheduler using directory: " + getSchedulerDirectoryFile());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+        }
+        return jobSchedulerStore;
+    }
+
+    public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
+        this.jobSchedulerStore = jobSchedulerStore;
+        configureService(jobSchedulerStore);
+        try {
+            jobSchedulerStore.start();
+        } catch (Exception e) {
+            RuntimeException exception = new RuntimeException(
+                    "Failed to start provided job scheduler store: " + jobSchedulerStore, e);
+            LOG.error(exception.getLocalizedMessage(), e);
+            throw exception;
+        }
+    }
+
     //
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -1829,6 +1918,29 @@ public class BrokerService implements Se
                 }
             }
         }
+
+        if (getJobSchedulerStore() != null) {
+            JobSchedulerStore scheduler = getJobSchedulerStore();
+            File schedulerDir = scheduler.getDirectory();
+            if (schedulerDir != null) {
+
+                String schedulerDirPath = schedulerDir.getAbsolutePath();
+                if (!schedulerDir.isAbsolute()) {
+                    schedulerDir = new File(schedulerDirPath);
+                }
+
+                while (schedulerDir != null && schedulerDir.isDirectory() == false) {
+                    schedulerDir = schedulerDir.getParentFile();
+                }
+                long schedularLimit = usage.getJobSchedulerUsage().getLimit();
+                long dirFreeSpace = schedulerDir.getUsableSpace();
+                if (schedularLimit > dirFreeSpace) {
+                    LOG.warn("Job Schedular Store limit is " + schedularLimit / (1024 * 1024) +
+                             " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
+                             " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
+                }
+            }
+        }
     }
 
     public void stopAllConnectors(ServiceStopper stopper) {
@@ -2056,7 +2168,7 @@ public class BrokerService implements Se
      */
     protected Broker addInterceptors(Broker broker) throws Exception {
         if (isSchedulerSupport()) {
-            SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile());
+            SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
             if (isUseJmx()) {
                 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
                 try {
@@ -2283,6 +2395,7 @@ public class BrokerService implements Se
                         10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                         new ThreadFactory() {
                             int count=0;
+                            @Override
                             public Thread newThread(Runnable runnable) {
                                 Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
                                 thread.setDaemon(true);
@@ -2301,6 +2414,7 @@ public class BrokerService implements Se
                 }
                 if (networkConnectorStartExecutor != null) {
                     networkConnectorStartExecutor.execute(new Runnable() {
+                        @Override
                         public void run() {
                             try {
                                 LOG.info("Async start of " + connector);

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Thu Dec 13 21:43:50 2012
@@ -157,6 +157,14 @@ public class BrokerView implements Broke
     public int getTempPercentUsage() {
        return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
     }
+    
+    public long getJobSchedulerStoreLimit() {
+        return brokerService.getSystemUsage().getJobSchedulerUsage().getLimit();
+    }
+    
+    public int getJobSchedulerStorePercentUsage() {
+        return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage();
+    }
 
     public void setStoreLimit(long limit) {
         brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
@@ -165,6 +173,10 @@ public class BrokerView implements Broke
     public void setTempLimit(long limit) {
         brokerService.getSystemUsage().getTempUsage().setLimit(limit);
     }
+    
+    public void setJobSchedulerStoreLimit(long limit) {
+        brokerService.getSystemUsage().getJobSchedulerUsage().setLimit(limit);
+    }
 
     public void resetStatistics() {
         safeGetBroker().getDestinationStatistics().reset();

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Thu Dec 13 21:43:50 2012
@@ -107,10 +107,18 @@ public interface BrokerViewMBean extends
     @MBeanInfo("Percent of temp limit used.")
     int getTempPercentUsage();
 
-    @MBeanInfo("Disk limit, in bytes, used for non-persistent messages and temporary date before producers are blocked.")
+    @MBeanInfo("Disk limit, in bytes, used for non-persistent messages and temporary data before producers are blocked.")
     long getTempLimit();
 
     void setTempLimit(@MBeanInfo("bytes") long limit);
+    
+    @MBeanInfo("Percent of job store limit used.")
+    int getJobSchedulerStorePercentUsage();
+
+    @MBeanInfo("Disk limit, in bytes, used for scheduled messages before producers are blocked.")
+    long getJobSchedulerStoreLimit();
+
+    void setJobSchedulerStoreLimit(@MBeanInfo("bytes") long limit);
 
     @MBeanInfo("Messages are synchronized to disk.")
     boolean isPersistent();

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Thu Dec 13 21:43:50 2012
@@ -17,12 +17,14 @@
 package org.apache.activemq.broker.scheduler;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.ScheduledMessage;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -33,13 +35,15 @@ import org.apache.activemq.command.Produ
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.usage.JobSchedulerUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.util.TypeConversionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.activemq.util.ByteSequence;
 
 public class SchedulerBroker extends BrokerFilter implements JobListener {
     private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class);
@@ -50,38 +54,25 @@ public class SchedulerBroker extends Bro
     private final ConnectionContext context = new ConnectionContext();
     private final ProducerId producerId = new ProducerId();
     private File directory;
+    private final SystemUsage systemUsage;
 
-    private JobSchedulerStore store;
+    private final JobSchedulerStore store;
     private JobScheduler scheduler;
 
-    public SchedulerBroker(Broker next, File directory) throws Exception {
+    public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
         super(next);
-        this.directory = directory;
+
+        this.store = store;
         this.producerId.setConnectionId(ID_GENERATOR.generateId());
         this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
-        context.setBroker(next);
-        LOG.info("Scheduler using directory: " + directory);
-
+        this.context.setBroker(next);
+        this.systemUsage = brokerService.getSystemUsage();
     }
 
     public synchronized JobScheduler getJobScheduler() throws Exception {
         return new JobSchedulerFacade(this);
     }
 
-    /**
-     * @return the directory
-     */
-    public File getDirectory() {
-        return this.directory;
-    }
-    /**
-     * @param directory
-     *            the directory to set
-     */
-    public void setDirectory(File directory) {
-        this.directory = directory;
-    }
-
     @Override
     public void start() throws Exception {
         this.started.set(true);
@@ -116,9 +107,8 @@ public class SchedulerBroker extends Bro
         Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
 
         String physicalName = messageSend.getDestination().getPhysicalName();
-        boolean schedularManage = physicalName.regionMatches(true, 0,
-                ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
-                ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
+        boolean schedularManage = physicalName.regionMatches(true, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
+            ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
 
         if (schedularManage == true) {
 
@@ -127,14 +117,14 @@ public class SchedulerBroker extends Bro
 
             String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
 
-            if (action != null ) {
+            if (action != null) {
 
                 Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
                 Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
 
                 if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
 
-                    if( startTime != null && endTime != null ) {
+                    if (startTime != null && endTime != null) {
 
                         long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
                         long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
@@ -152,7 +142,7 @@ public class SchedulerBroker extends Bro
                     scheduler.remove(jobId);
                 } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
 
-                    if( startTime != null && endTime != null ) {
+                    if (startTime != null && endTime != null) {
 
                         long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
                         long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
@@ -165,7 +155,7 @@ public class SchedulerBroker extends Bro
             }
 
         } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
-            //clear transaction context
+            // clear transaction context
             Message msg = messageSend.copy();
             msg.setTransactionId(null);
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
@@ -173,7 +163,7 @@ public class SchedulerBroker extends Bro
                 cronEntry = cronValue.toString();
             }
             if (periodValue != null) {
-              period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
+                period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
             }
             if (delayValue != null) {
                 delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
@@ -182,17 +172,17 @@ public class SchedulerBroker extends Bro
             if (repeatValue != null) {
                 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
             }
-            getInternalScheduler().schedule(msg.getMessageId().toString(),
-                    new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat);
+            getInternalScheduler().schedule(msg.getMessageId().toString(), new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay,
+                period, repeat);
 
         } else {
             super.send(producerExchange, messageSend);
         }
     }
 
+    @Override
     public void scheduledJob(String id, ByteSequence job) {
-        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job
-                .getOffset(), job.getLength());
+        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());
         try {
             Message messageSend = (Message) this.wireFormat.unmarshal(packet);
             messageSend.setOriginalTransactionId(null);
@@ -204,11 +194,36 @@ public class SchedulerBroker extends Bro
                 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
             }
 
+            // Check for room in the job scheduler store
+            if (systemUsage.getJobSchedulerUsage() != null) {
+                JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
+                if (usage.isFull()) {
+                    final String logMessage = "Job Scheduler Store is Full (" +
+                        usage.getPercentUsage() + "% of " + usage.getLimit() +
+                        "). Stopping producer (" + messageSend.getProducerId() +
+                        ") to prevent flooding of the job scheduler store." +
+                        " See http://activemq.apache.org/producer-flow-control.html for more info";
+
+                    long start = System.currentTimeMillis();
+                    long nextWarn = start;
+                    while (!usage.waitForSpace(1000)) {
+                        if (context.getStopping().get()) {
+                            throw new IOException("Connection closed, send aborted.");
+                        }
+
+                        long now = System.currentTimeMillis();
+                        if (now >= nextWarn) {
+                            LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
+                            nextWarn = now + 30000l;
+                        }
+                    }
+                }
+            }
+
             if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
                 // create a unique id - the original message could be sent
                 // lots of times
-                messageSend.setMessageId(
-                        new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
+                messageSend.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
             }
 
             // Add the jobId as a property
@@ -233,7 +248,7 @@ public class SchedulerBroker extends Bro
 
                 long expiration = timeToLive + newTimeStamp;
 
-                if(expiration > oldExpiration) {
+                if (expiration > oldExpiration) {
                     if (timeToLive > 0 && expiration > 0) {
                         messageSend.setExpiration(expiration);
                     }
@@ -257,7 +272,7 @@ public class SchedulerBroker extends Bro
     protected synchronized JobScheduler getInternalScheduler() throws Exception {
         if (this.started.get()) {
             if (this.scheduler == null) {
-                this.scheduler = getStore().getJobScheduler("JMS");
+                this.scheduler = store.getJobScheduler("JMS");
                 this.scheduler.addListener(this);
             }
             return this.scheduler;
@@ -265,21 +280,7 @@ public class SchedulerBroker extends Bro
         return null;
     }
 
-    private JobSchedulerStore getStore() throws Exception {
-        if (started.get()) {
-            if (this.store == null) {
-                String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
-                this.store = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
-                this.store.setDirectory(directory);
-                this.store.start();
-            }
-            return this.store;
-        }
-        return null;
-    }
-
-    protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo)
-            throws Exception {
+    protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo) throws Exception {
 
         org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
         try {

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/JobSchedulerUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/JobSchedulerUsage.java?rev=1421557&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/JobSchedulerUsage.java (added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/JobSchedulerUsage.java Thu Dec 13 21:43:50 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+
+/**
+ * Used to keep track of how much of something is being used so that a
+ * productive working set usage can be controlled. Main use case is manage
+ * memory usage.
+ *
+ * @org.apache.xbean.XBean
+ *
+ */
+public class JobSchedulerUsage extends Usage<JobSchedulerUsage> {
+
+    private JobSchedulerStore store;
+
+    public JobSchedulerUsage() {
+        super(null, null, 1.0f);
+    }
+
+    public JobSchedulerUsage(String name, JobSchedulerStore store) {
+        super(null, name, 1.0f);
+        this.store = store;
+    }
+
+    public JobSchedulerUsage(JobSchedulerUsage parent, String name) {
+        super(parent, name, 1.0f);
+        this.store = parent.store;
+    }
+
+    @Override
+    protected long retrieveUsage() {
+        if (store == null) {
+            return 0;
+        }
+        return store.size();
+    }
+
+    public JobSchedulerStore getStore() {
+        return store;
+    }
+
+    public void setStore(JobSchedulerStore store) {
+        this.store = store;
+        onLimitChange();
+    }
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/JobSchedulerUsage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/SystemUsage.java Thu Dec 13 21:43:50 2012
@@ -19,16 +19,18 @@ package org.apache.activemq.usage;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ThreadPoolExecutor;
+
 import org.apache.activemq.Service;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.store.PListStore;
 import org.apache.activemq.store.PersistenceAdapter;
 
 /**
  * Holder for Usage instances for memory, store and temp files Main use case is
  * manage memory usage.
- * 
+ *
  * @org.apache.xbean.XBean
- * 
+ *
  */
 public class SystemUsage implements Service {
 
@@ -38,6 +40,7 @@ public class SystemUsage implements Serv
     private StoreUsage storeUsage;
     private TempUsage tempUsage;
     private ThreadPoolExecutor executor;
+    private JobSchedulerUsage jobSchedulerUsage;
 
     /**
      * True if someone called setSendFailIfNoSpace() on this particular usage
@@ -51,15 +54,16 @@ public class SystemUsage implements Serv
     private final List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
 
     public SystemUsage() {
-        this("default", null, null);
+        this("default", null, null, null);
     }
 
-    public SystemUsage(String name, PersistenceAdapter adapter, PListStore tempStore) {
+    public SystemUsage(String name, PersistenceAdapter adapter, PListStore tempStore, JobSchedulerStore jobSchedulerStore) {
         this.parent = null;
         this.name = name;
         this.memoryUsage = new MemoryUsage(name + ":memory");
         this.storeUsage = new StoreUsage(name + ":store", adapter);
         this.tempUsage = new TempUsage(name + ":temp", tempStore);
+        this.jobSchedulerUsage = new JobSchedulerUsage(name + ":jobScheduler", jobSchedulerStore);
         this.memoryUsage.setExecutor(getExecutor());
         this.storeUsage.setExecutor(getExecutor());
         this.tempUsage.setExecutor(getExecutor());
@@ -72,6 +76,7 @@ public class SystemUsage implements Serv
         this.memoryUsage = new MemoryUsage(parent.memoryUsage, name + ":memory");
         this.storeUsage = new StoreUsage(parent.storeUsage, name + ":store");
         this.tempUsage = new TempUsage(parent.tempUsage, name + ":temp");
+        this.jobSchedulerUsage = new JobSchedulerUsage(parent.jobSchedulerUsage, name + ":jobScheduler");
         this.memoryUsage.setExecutor(getExecutor());
         this.storeUsage.setExecutor(getExecutor());
         this.tempUsage.setExecutor(getExecutor());
@@ -102,11 +107,19 @@ public class SystemUsage implements Serv
         return this.tempUsage;
     }
 
+    /**
+     * @return the schedulerUsage
+     */
+    public JobSchedulerUsage getJobSchedulerUsage() {
+        return this.jobSchedulerUsage;
+    }
+
     @Override
     public String toString() {
         return "UsageManager(" + getName() + ")";
     }
 
+    @Override
     public void start() {
         if (parent != null) {
             parent.addChild(this);
@@ -114,8 +127,10 @@ public class SystemUsage implements Serv
         this.memoryUsage.start();
         this.storeUsage.start();
         this.tempUsage.start();
+        this.jobSchedulerUsage.start();
     }
 
+    @Override
     public void stop() {
         if (parent != null) {
             parent.removeChild(this);
@@ -123,6 +138,7 @@ public class SystemUsage implements Serv
         this.memoryUsage.stop();
         this.storeUsage.stop();
         this.tempUsage.stop();
+        this.jobSchedulerUsage.stop();
     }
 
     /**
@@ -185,6 +201,7 @@ public class SystemUsage implements Serv
         this.memoryUsage.setName(name + ":memory");
         this.storeUsage.setName(name + ":store");
         this.tempUsage.setName(name + ":temp");
+        this.jobSchedulerUsage.setName(name + ":jobScheduler");
     }
 
     public void setMemoryUsage(MemoryUsage memoryUsage) {
@@ -210,7 +227,6 @@ public class SystemUsage implements Serv
         }
         this.storeUsage = storeUsage;
         this.storeUsage.setExecutor(executor);
-
     }
 
     public void setTempUsage(TempUsage tempDiskUsage) {
@@ -227,6 +243,20 @@ public class SystemUsage implements Serv
         this.tempUsage.setExecutor(getExecutor());
     }
 
+    public void setJobSchedulerUsage(JobSchedulerUsage jobSchedulerUsage) {
+        if (jobSchedulerUsage.getStore() == null) {
+            jobSchedulerUsage.setStore(this.jobSchedulerUsage.getStore());
+        }
+        if (jobSchedulerUsage.getName() == null) {
+            jobSchedulerUsage.setName(this.jobSchedulerUsage.getName());
+        }
+        if (parent != null) {
+            jobSchedulerUsage.setParent(parent.jobSchedulerUsage);
+        }
+        this.jobSchedulerUsage = jobSchedulerUsage;
+        this.jobSchedulerUsage.setExecutor(getExecutor());
+    }
+
     /**
      * @return the executor
      */
@@ -249,5 +279,8 @@ public class SystemUsage implements Serv
         if (this.tempUsage != null) {
             this.tempUsage.setExecutor(this.executor);
         }
+        if(this.jobSchedulerUsage != null) {
+            this.jobSchedulerUsage.setExecutor(this.executor);
+        }
     }
 }

Propchange: activemq/trunk/activemq-core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Dec 13 21:43:50 2012
@@ -26,3 +26,4 @@ shared
 .idea
 *.i??
 derby.log
+createData

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java Thu Dec 13 21:43:50 2012
@@ -29,10 +29,14 @@ import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.ScheduledMessage;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ProducerThread;
+import org.apache.activemq.util.Wait;
 
 public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
 
@@ -212,6 +216,64 @@ public class JmsSchedulerTest extends Em
         producer.send(message);
         producer.close();
     }
+    
+    public void testJobSchedulerStoreUsage() throws Exception {
+        
+        // Shrink the store limit down so we get the producer to block
+        broker.getSystemUsage().getJobSchedulerUsage().setLimit(10 * 1024);
+        
+        
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final long time = 5000;
+        final ProducerThread producer = new ProducerThread(sess, destination) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                Message message = super.createMessage(i);
+                message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+                return message;
+            }  
+        };
+        producer.setMessageCount(100);
+        producer.start();
+        
+        MessageConsumer consumer = sess.createConsumer(destination);
+        final CountDownLatch latch = new CountDownLatch(100);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        // wait for the producer to block, which should happen immediately, and also wait long 
+        // enough for the delay to elapse.  We should see no deliveries as the send should block
+        // on the first message.
+        Thread.sleep(10000l);
+        
+        assertEquals(100, latch.getCount());
+
+        // Increase the store limit so the producer unblocks.  Everything should enqueue at this point.
+        broker.getSystemUsage().getJobSchedulerUsage().setLimit(1024 * 1024 * 33);
+
+        // Wait long enough that the messages are enqueued and the delivery delay has elapsed.
+        Thread.sleep(10000l);
+
+        // Make sure we sent all the messages we expected to send
+        Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return producer.getSentCount() == producer.getMessageCount();
+            }
+        }, 20000l);
+        
+        assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
+        
+        // Make sure we got all the messages we expected to get
+        latch.await(20000l, TimeUnit.MILLISECONDS);
+        
+        assertEquals("Consumer did not receive all messages.", 0, latch.getCount());
+    }
 
     @Override
     protected void setUp() throws Exception {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java?rev=1421557&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java Thu Dec 13 21:43:50 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usage;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ProducerThread;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreUsageTest.class);
+
+    final int WAIT_TIME_MILLS = 20*1000;
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        File schedulerDirectory = new File("target/scheduler");
+
+        IOHelper.mkdirs(schedulerDirectory);
+        IOHelper.deleteChildren(schedulerDirectory);
+
+        BrokerService broker = super.createBroker();
+        broker.setSchedulerSupport(true);
+        broker.setSchedulerDirectoryFile(schedulerDirectory);
+        broker.getSystemUsage().getJobSchedulerUsage().setLimit(7 * 1024);
+        broker.deleteAllMessages();
+        return broker;
+    }
+
+    @Override
+    protected boolean isPersistent() {
+        return true;
+    }
+
+    public void testJmx() throws Exception {
+
+        LOG.info("Initial scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination dest = sess.createQueue(this.getClass().getName());
+        final ProducerThread producer = new ProducerThread(sess, dest) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                Message message = super.createMessage(i);
+                message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, WAIT_TIME_MILLS / 2);
+                return message;
+            }
+        };
+        producer.setMessageCount(100);
+        producer.start();
+
+        assertEquals(7 * 1024, broker.getAdminView().getJobSchedulerStoreLimit());
+
+        // wait for the producer to block
+        Thread.sleep(WAIT_TIME_MILLS / 2);
+
+        assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() > 100);
+
+        broker.getAdminView().setJobSchedulerStoreLimit(1024 * 1024 * 33);
+
+        Thread.sleep(WAIT_TIME_MILLS);
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return producer.getSentCount() == producer.getMessageCount();
+            }
+        }, WAIT_TIME_MILLS * 2);
+
+        assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
+
+        LOG.info("Final scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
+
+        assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() < 100);
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Dec 13 21:43:50 2012
@@ -89,7 +89,6 @@ import org.apache.activemq.util.Callback
 import org.apache.activemq.util.DataByteArrayInputStream;
 import org.apache.activemq.util.DataByteArrayOutputStream;
 import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.LockFile;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
 import org.slf4j.Logger;
@@ -218,7 +217,6 @@ public abstract class MessageDatabase ex
     int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
 
     protected AtomicBoolean opened = new AtomicBoolean();
-    private LockFile lockFile;
     private boolean ignoreMissingJournalfiles = false;
     private int indexCacheSize = 10000;
     private boolean checkForCorruptJournalFiles = false;

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java Thu Dec 13 21:43:50 2012
@@ -16,38 +16,39 @@
  */
 package org.apache.activemq.store.kahadb.scheduler;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.activemq.broker.scheduler.JobScheduler;
 import org.apache.activemq.broker.scheduler.JobSchedulerStore;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
 import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Page;
 import org.apache.activemq.store.kahadb.disk.page.PageFile;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
-import org.apache.activemq.util.LockFile;
 import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
 import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.LockFile;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
 public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore {
     static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
     private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -58,6 +59,7 @@ public class JobSchedulerStoreImpl exten
     private File directory;
     PageFile pageFile;
     private Journal journal;
+    protected AtomicLong journalSize = new AtomicLong(0);
     private LockFile lockFile;
     private boolean failIfDatabaseIsLocked;
     private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
@@ -72,6 +74,7 @@ public class JobSchedulerStoreImpl exten
         protected MetaData(JobSchedulerStoreImpl store) {
             this.store = store;
         }
+
         private final JobSchedulerStoreImpl store;
         Page<MetaData> page;
         BTreeIndex<Integer, Integer> journalRC;
@@ -111,7 +114,6 @@ public class JobSchedulerStoreImpl exten
         public void write(DataOutput os) throws IOException {
             os.writeLong(this.storedSchedulers.getPageId());
             os.writeLong(this.journalRC.getPageId());
-
         }
     }
 
@@ -121,18 +123,22 @@ public class JobSchedulerStoreImpl exten
         MetaDataMarshaller(JobSchedulerStoreImpl store) {
             this.store = store;
         }
+
+        @Override
         public MetaData readPayload(DataInput dataIn) throws IOException {
             MetaData rc = new MetaData(this.store);
             rc.read(dataIn);
             return rc;
         }
 
+        @Override
         public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
             object.write(dataOut);
         }
     }
 
     class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
+        @Override
         public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
             List<JobLocation> result = new ArrayList<JobLocation>();
             int size = dataIn.readInt();
@@ -144,6 +150,7 @@ public class JobSchedulerStoreImpl exten
             return result;
         }
 
+        @Override
         public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
             dataOut.writeInt(value.size());
             for (JobLocation jobLocation : value) {
@@ -154,15 +161,19 @@ public class JobSchedulerStoreImpl exten
 
     class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
         private final JobSchedulerStoreImpl store;
+
         JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
             this.store = store;
         }
+
+        @Override
         public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
             JobSchedulerImpl result = new JobSchedulerImpl(this.store);
             result.read(dataIn);
             return result;
         }
 
+        @Override
         public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
             js.write(dataOut);
         }
@@ -177,14 +188,14 @@ public class JobSchedulerStoreImpl exten
     public void setDirectory(File directory) {
         this.directory = directory;
     }
-    
+
     @Override
     public long size() {
-        if ( !isStarted() ) {
+        if (!isStarted()) {
             return 0;
         }
         try {
-            return journal.getDiskSize() + pageFile.getDiskSize();
+            return journalSize.get() + pageFile.getDiskSize();
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -197,6 +208,7 @@ public class JobSchedulerStoreImpl exten
             final JobSchedulerImpl js = new JobSchedulerImpl(this);
             js.setName(name);
             getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     js.createIndexes(tx);
                     js.load(tx);
@@ -221,6 +233,7 @@ public class JobSchedulerStoreImpl exten
         if (result) {
             js.stop();
             getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     metaData.storedSchedulers.remove(tx, name);
                     js.destroy(tx);
@@ -241,12 +254,14 @@ public class JobSchedulerStoreImpl exten
         this.journal.setDirectory(directory);
         this.journal.setMaxFileLength(getJournalMaxFileLength());
         this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
+        this.journal.setSizeAccumulator(this.journalSize);
         this.journal.start();
         this.pageFile = new PageFile(directory, "scheduleDB");
         this.pageFile.setWriteBatchSize(1);
         this.pageFile.load();
 
         this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
+            @Override
             public void execute(Transaction tx) throws IOException {
                 if (pageFile.getPageCount() == 0) {
                     Page<MetaData> page = tx.allocate();
@@ -263,20 +278,20 @@ public class JobSchedulerStoreImpl exten
                 }
                 metaData.load(tx);
                 metaData.loadScheduler(tx, schedulers);
-                for (JobSchedulerImpl js :schedulers.values()) {
+                for (JobSchedulerImpl js : schedulers.values()) {
                     try {
                         js.start();
                     } catch (Exception e) {
-                        JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(),e);
+                        JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e);
                     }
-               }
+                }
             }
         });
 
         this.pageFile.flush();
         LOG.info(this + " started");
     }
-    
+
     @Override
     protected synchronized void doStop(ServiceStopper stopper) throws Exception {
         for (JobSchedulerImpl js : this.schedulers.values()) {
@@ -293,7 +308,6 @@ public class JobSchedulerStoreImpl exten
         }
         this.lockFile = null;
         LOG.info(this + " stopped");
-
     }
 
     synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
@@ -301,7 +315,6 @@ public class JobSchedulerStoreImpl exten
         Integer val = this.metaData.journalRC.get(tx, logId);
         int refCount = val != null ? val.intValue() + 1 : 1;
         this.metaData.journalRC.put(tx, logId, refCount);
-
     }
 
     synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
@@ -316,7 +329,6 @@ public class JobSchedulerStoreImpl exten
         } else {
             this.metaData.journalRC.put(tx, logId, refCount);
         }
-
     }
 
     synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
@@ -341,9 +353,8 @@ public class JobSchedulerStoreImpl exten
                         lockFile.lock();
                         break;
                     } catch (IOException e) {
-                        LOG.info("Database " + lockFileName + " is locked... waiting "
-                                + (DATABASE_LOCKED_WAIT_DELAY / 1000)
-                                + " seconds for the database to be unlocked. Reason: " + e);
+                        LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000)
+                            + " seconds for the database to be unlocked. Reason: " + e);
                         try {
                             Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
                         } catch (InterruptedException e1) {
@@ -395,5 +406,4 @@ public class JobSchedulerStoreImpl exten
     public String toString() {
         return "JobSchedulerStore:" + this.directory;
     }
-
 }

Modified: activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java?rev=1421557&r1=1421556&r2=1421557&view=diff
==============================================================================
--- activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java (original)
+++ activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java Thu Dec 13 21:43:50 2012
@@ -74,6 +74,9 @@ public class XBeanBrokerService extends 
         if (usage.getTempUsage().getStore() == null) {
             usage.getTempUsage().setStore(getTempDataStore());
         }
+        if (usage.getJobSchedulerUsage().getStore() == null) {
+            usage.getJobSchedulerUsage().setStore(getJobSchedulerStore());
+        }
     }
 
     /**