You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/06 23:05:07 UTC
svn commit: r1406371 - in /activemq/trunk/activemq-core: ./
src/main/java/org/apache/activemq/broker/jmx/
src/main/java/org/apache/activemq/broker/scheduler/
src/main/java/org/apache/activemq/store/
src/main/java/org/apache/activemq/store/kahadb/schedu...
Author: chirino
Date: Tue Nov 6 22:05:06 2012
New Revision: 1406371
URL: http://svn.apache.org/viewvc?rev=1406371&view=rev
Log:
Keep the broker.scheduler package free of kahadb impl specifics.
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
- copied, changed from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
- copied, changed from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
- copied, changed from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
- copied, changed from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
Removed:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
Modified:
activemq/trunk/activemq-core/pom.xml
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Tue Nov 6 22:05:06 2012
@@ -1136,11 +1136,11 @@
<exclude>org/apache/activemq/xbean/JDBCPersistenceXBeanConfigTest.*</exclude>
<exclude>org/apache/activemq/xbean/XBeanXmlTest.*</exclude>
<exclude>org/apache/bugs/AMQ1730Test.*</exclude>
- <exclude>org/apache/bugs/LoadBalanceTest.*</exclude>
- <exclude>org/apache/kahadb/index/BTreeIndexTest.*</exclude>
- <exclude>org/apache/kahadb/index/HashIndexTest.*</exclude>
- <exclude>org/apache/kahadb/index/ListIndexTest.*</exclude>
- <exclude>org/apache/kahadb/util/DataByteArrayInputStreamTest.*</exclude>
+ <exclude>org/apache/activemq/store/kahadb/bugs/LoadBalanceTest.*</exclude>
+ <exclude>org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.*</exclude>
+ <exclude>org/apache/activemq/store/kahadb/disk/index/HashIndexTest.*</exclude>
+ <exclude>org/apache/activemq/store/kahadb/disk/index/ListIndexTest.*</exclude>
+ <exclude>org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStreamTest.*</exclude>
</excludes>
</configuration>
</plugin>
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java Tue Nov 6 22:05:06 2012
@@ -16,17 +16,14 @@
*/
package org.apache.activemq.broker.jmx;
-import java.io.IOException;
-import java.util.List;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.scheduler.Job;
-import org.apache.activemq.broker.scheduler.JobImpl;
import org.apache.activemq.broker.scheduler.JobScheduler;
+import org.apache.activemq.broker.scheduler.JobSupport;
+
+import javax.management.openmbean.*;
+import java.io.IOException;
+import java.util.List;
public class JobSchedulerView implements JobSchedulerViewMBean {
@@ -53,8 +50,8 @@ public class JobSchedulerView implements
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" });
TabularDataSupport rc = new TabularDataSupport(tt);
- long start = JobImpl.getDataTime(startTime);
- long finish = JobImpl.getDataTime(finishTime);
+ long start = JobSupport.getDataTime(startTime);
+ long finish = JobSupport.getDataTime(finishTime);
List<Job> jobs = this.jobScheduler.getAllJobs(start, finish);
for (Job job : jobs) {
rc.put(new CompositeDataSupport(ct, factory.getFields(job)));
@@ -76,7 +73,7 @@ public class JobSchedulerView implements
public String getNextScheduleTime() throws Exception {
long time = this.jobScheduler.getNextScheduleTime();
- return JobImpl.getDateTime(time);
+ return JobSupport.getDateTime(time);
}
public void removeAllJobs() throws Exception {
@@ -85,8 +82,8 @@ public class JobSchedulerView implements
}
public void removeAllJobs(String startTime, String finishTime) throws Exception {
- long start = JobImpl.getDataTime(startTime);
- long finish = JobImpl.getDataTime(finishTime);
+ long start = JobSupport.getDataTime(startTime);
+ long finish = JobSupport.getDataTime(finishTime);
this.jobScheduler.removeAllJobs(start, finish);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java Tue Nov 6 22:05:06 2012
@@ -1,392 +1,20 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.activemq.broker.scheduler;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
-import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
-import org.apache.activemq.store.kahadb.disk.journal.Journal;
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.Page;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
-import org.apache.activemq.util.LockFile;
-import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.activemq.Service;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-public class JobSchedulerStore extends ServiceSupport {
- static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
- private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
-
- public static final int CLOSED_STATE = 1;
- public static final int OPEN_STATE = 2;
-
- private File directory;
- PageFile pageFile;
- private Journal journal;
- private LockFile lockFile;
- private boolean failIfDatabaseIsLocked;
- private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
- private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
- private boolean enableIndexWriteAsync = false;
- // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
- MetaData metaData = new MetaData(this);
- final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
- Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
-
- protected class MetaData {
- protected MetaData(JobSchedulerStore store) {
- this.store = store;
- }
- private final JobSchedulerStore store;
- Page<MetaData> page;
- BTreeIndex<Integer, Integer> journalRC;
- BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
-
- void createIndexes(Transaction tx) throws IOException {
- this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
- this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
- }
-
- void load(Transaction tx) throws IOException {
- this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
- this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
- this.storedSchedulers.load(tx);
- this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
- this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
- this.journalRC.load(tx);
- }
-
- void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
- for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
- Entry<String, JobSchedulerImpl> entry = i.next();
- entry.getValue().load(tx);
- schedulers.put(entry.getKey(), entry.getValue());
- }
- }
-
- public void read(DataInput is) throws IOException {
- this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
- this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
- this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
- this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
- this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
- this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
- }
-
- public void write(DataOutput os) throws IOException {
- os.writeLong(this.storedSchedulers.getPageId());
- os.writeLong(this.journalRC.getPageId());
-
- }
- }
-
- class MetaDataMarshaller extends VariableMarshaller<MetaData> {
- private final JobSchedulerStore store;
-
- MetaDataMarshaller(JobSchedulerStore store) {
- this.store = store;
- }
- public MetaData readPayload(DataInput dataIn) throws IOException {
- MetaData rc = new MetaData(this.store);
- rc.read(dataIn);
- return rc;
- }
-
- public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
- object.write(dataOut);
- }
- }
-
- class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
- public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
- List<JobLocation> result = new ArrayList<JobLocation>();
- int size = dataIn.readInt();
- for (int i = 0; i < size; i++) {
- JobLocation jobLocation = new JobLocation();
- jobLocation.readExternal(dataIn);
- result.add(jobLocation);
- }
- return result;
- }
-
- public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
- dataOut.writeInt(value.size());
- for (JobLocation jobLocation : value) {
- jobLocation.writeExternal(dataOut);
- }
- }
- }
-
- class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
- private final JobSchedulerStore store;
- JobSchedulerMarshaller(JobSchedulerStore store) {
- this.store = store;
- }
- public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
- JobSchedulerImpl result = new JobSchedulerImpl(this.store);
- result.read(dataIn);
- return result;
- }
-
- public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
- js.write(dataOut);
- }
- }
-
- public File getDirectory() {
- return directory;
- }
-
- public void setDirectory(File directory) {
- this.directory = directory;
- }
-
- public long size() {
- if ( !isStarted() ) {
- return 0;
- }
- try {
- return journal.getDiskSize() + pageFile.getDiskSize();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public JobScheduler getJobScheduler(final String name) throws Exception {
- JobSchedulerImpl result = this.schedulers.get(name);
- if (result == null) {
- final JobSchedulerImpl js = new JobSchedulerImpl(this);
- js.setName(name);
- getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- js.createIndexes(tx);
- js.load(tx);
- metaData.storedSchedulers.put(tx, name, js);
- }
- });
- result = js;
- this.schedulers.put(name, js);
- if (isStarted()) {
- result.start();
- }
- this.pageFile.flush();
- }
- return result;
- }
- synchronized public boolean removeJobScheduler(final String name) throws Exception {
- boolean result = false;
- final JobSchedulerImpl js = this.schedulers.remove(name);
- result = js != null;
- if (result) {
- js.stop();
- getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- metaData.storedSchedulers.remove(tx, name);
- js.destroy(tx);
- }
- });
- }
- return result;
- }
-
- @Override
- protected synchronized void doStart() throws Exception {
- if (this.directory == null) {
- this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
- }
- IOHelper.mkdirs(this.directory);
- lock();
- this.journal = new Journal();
- this.journal.setDirectory(directory);
- this.journal.setMaxFileLength(getJournalMaxFileLength());
- this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
- this.journal.start();
- this.pageFile = new PageFile(directory, "scheduleDB");
- this.pageFile.setWriteBatchSize(1);
- this.pageFile.load();
-
- this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- if (pageFile.getPageCount() == 0) {
- Page<MetaData> page = tx.allocate();
- assert page.getPageId() == 0;
- page.set(metaData);
- metaData.page = page;
- metaData.createIndexes(tx);
- tx.store(metaData.page, metaDataMarshaller, true);
-
- } else {
- Page<MetaData> page = tx.load(0, metaDataMarshaller);
- metaData = page.get();
- metaData.page = page;
- }
- metaData.load(tx);
- metaData.loadScheduler(tx, schedulers);
- for (JobSchedulerImpl js :schedulers.values()) {
- try {
- js.start();
- } catch (Exception e) {
- JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
- }
- }
- }
- });
-
- this.pageFile.flush();
- LOG.info(this + " started");
- }
-
- @Override
- protected synchronized void doStop(ServiceStopper stopper) throws Exception {
- for (JobSchedulerImpl js : this.schedulers.values()) {
- js.stop();
- }
- if (this.pageFile != null) {
- this.pageFile.unload();
- }
- if (this.journal != null) {
- journal.close();
- }
- if (this.lockFile != null) {
- this.lockFile.unlock();
- }
- this.lockFile = null;
- LOG.info(this + " stopped");
-
- }
-
- synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
- int logId = location.getDataFileId();
- Integer val = this.metaData.journalRC.get(tx, logId);
- int refCount = val != null ? val.intValue() + 1 : 1;
- this.metaData.journalRC.put(tx, logId, refCount);
-
- }
-
- synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
- int logId = location.getDataFileId();
- int refCount = this.metaData.journalRC.get(tx, logId);
- refCount--;
- if (refCount <= 0) {
- this.metaData.journalRC.remove(tx, logId);
- Set<Integer> set = new HashSet<Integer>();
- set.add(logId);
- this.journal.removeDataFiles(set);
- } else {
- this.metaData.journalRC.put(tx, logId, refCount);
- }
-
- }
-
- synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
- ByteSequence result = null;
- result = this.journal.read(location);
- return result;
- }
-
- synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
- return this.journal.write(payload, sync);
- }
-
- private void lock() throws IOException {
- if (lockFile == null) {
- File lockFileName = new File(directory, "lock");
- lockFile = new LockFile(lockFileName, true);
- if (failIfDatabaseIsLocked) {
- lockFile.lock();
- } else {
- while (true) {
- try {
- lockFile.lock();
- break;
- } catch (IOException e) {
- LOG.info("Database " + lockFileName + " is locked... waiting "
- + (DATABASE_LOCKED_WAIT_DELAY / 1000)
- + " seconds for the database to be unlocked. Reason: " + e);
- try {
- Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
- } catch (InterruptedException e1) {
- }
- }
- }
- }
- }
- }
-
- PageFile getPageFile() {
- this.pageFile.isLoaded();
- return this.pageFile;
- }
-
- public boolean isFailIfDatabaseIsLocked() {
- return failIfDatabaseIsLocked;
- }
-
- public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
- this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
- }
-
- public int getJournalMaxFileLength() {
- return journalMaxFileLength;
- }
-
- public void setJournalMaxFileLength(int journalMaxFileLength) {
- this.journalMaxFileLength = journalMaxFileLength;
- }
-
- public int getJournalMaxWriteBatchSize() {
- return journalMaxWriteBatchSize;
- }
-
- public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
- this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
- }
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface JobSchedulerStore extends Service {
+ File getDirectory();
- public boolean isEnableIndexWriteAsync() {
- return enableIndexWriteAsync;
- }
+ void setDirectory(File directory);
- public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
- this.enableIndexWriteAsync = enableIndexWriteAsync;
- }
+ long size();
- @Override
- public String toString() {
- return "JobSchedulerStore:" + this.directory;
- }
+ JobScheduler getJobScheduler(String name) throws Exception;
+ boolean removeJobScheduler(String name) throws Exception;
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java?rev=1406371&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java Tue Nov 6 22:05:06 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class JobSupport {
+ public static String getDateTime(long value) {
+ DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ Date date = new Date(value);
+ return dateFormat.format(date);
+ }
+
+ public static long getDataTime(String value) throws Exception {
+ DateFormat dfm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ Date date = dfm.parse(value);
+ return date.getTime();
+ }
+
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Tue Nov 6 22:05:06 2012
@@ -268,7 +268,8 @@ public class SchedulerBroker extends Bro
private JobSchedulerStore getStore() throws Exception {
if (started.get()) {
if (this.store == null) {
- this.store = new JobSchedulerStore();
+ String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
+ this.store = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
this.store.setDirectory(directory);
this.store.start();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java Tue Nov 6 22:05:06 2012
@@ -1,7 +1,6 @@
package org.apache.activemq.store;
import org.apache.activemq.Service;
-import org.apache.activemq.store.kahadb.plist.PListImpl;
import java.io.File;
@@ -13,7 +12,7 @@ public interface PListStore extends Serv
void setDirectory(File directory);
- PListImpl getPList(String name) throws Exception;
+ PList getPList(String name) throws Exception;
boolean removePList(String name) throws Exception;
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java (from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java&r1=1406370&r2=1406371&rev=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java Tue Nov 6 22:05:06 2012
@@ -14,11 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker.scheduler;
+package org.apache.activemq.store.kahadb.scheduler;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
+
+import org.apache.activemq.broker.scheduler.Job;
+import org.apache.activemq.broker.scheduler.JobSupport;
import org.apache.activemq.util.ByteSequence;
@@ -63,27 +66,12 @@ public class JobImpl implements Job {
public String getNextExecutionTime() {
- return JobImpl.getDateTime(this.jobLocation.getNextTime());
+ return JobSupport.getDateTime(this.jobLocation.getNextTime());
}
public String getStartTime() {
- return JobImpl.getDateTime(getStart());
- }
-
- public static long getDataTime(String value) throws Exception {
- DateFormat dfm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- Date date = dfm.parse(value);
- return date.getTime();
- }
-
- public static String getDateTime(long value) {
- DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Date date = new Date(value);
- return dateFormat.format(date);
+ return JobSupport.getDateTime(getStart());
}
-
-
}
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java (from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java&r1=1406370&r2=1406371&rev=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java Tue Nov 6 22:05:06 2012
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker.scheduler;
+package org.apache.activemq.store.kahadb.scheduler;
import java.io.DataInput;
import java.io.DataOutput;
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java (from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java&r1=1406370&r2=1406371&rev=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java Tue Nov 6 22:05:06 2012
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker.scheduler;
+package org.apache.activemq.store.kahadb.scheduler;
import java.io.DataInput;
import java.io.DataOutput;
@@ -28,6 +28,10 @@ import java.util.concurrent.atomic.Atomi
import javax.jms.MessageFormatException;
+import org.apache.activemq.broker.scheduler.CronParser;
+import org.apache.activemq.broker.scheduler.Job;
+import org.apache.activemq.broker.scheduler.JobListener;
+import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
@@ -42,7 +46,7 @@ import org.apache.activemq.store.kahadb.
class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler {
private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
- final JobSchedulerStore store;
+ final JobSchedulerStoreImpl store;
private final AtomicBoolean running = new AtomicBoolean();
private String name;
BTreeIndex<Long, List<JobLocation>> index;
@@ -51,7 +55,7 @@ class JobSchedulerImpl extends ServiceSu
private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final ScheduleTime scheduleTime = new ScheduleTime();
- JobSchedulerImpl(JobSchedulerStore store) {
+ JobSchedulerImpl(JobSchedulerStoreImpl store) {
this.store = store;
}
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java (from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java&r1=1406370&r2=1406371&rev=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java Tue Nov 6 22:05:06 2012
@@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker.scheduler;
+package org.apache.activemq.store.kahadb.scheduler;
+import org.apache.activemq.broker.scheduler.JobScheduler;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
@@ -46,8 +48,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-public class JobSchedulerStore extends ServiceSupport {
- static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
+public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore {
+ static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
public static final int CLOSED_STATE = 1;
@@ -67,10 +69,10 @@ public class JobSchedulerStore extends S
Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
protected class MetaData {
- protected MetaData(JobSchedulerStore store) {
+ protected MetaData(JobSchedulerStoreImpl store) {
this.store = store;
}
- private final JobSchedulerStore store;
+ private final JobSchedulerStoreImpl store;
Page<MetaData> page;
BTreeIndex<Integer, Integer> journalRC;
BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
@@ -114,9 +116,9 @@ public class JobSchedulerStore extends S
}
class MetaDataMarshaller extends VariableMarshaller<MetaData> {
- private final JobSchedulerStore store;
+ private final JobSchedulerStoreImpl store;
- MetaDataMarshaller(JobSchedulerStore store) {
+ MetaDataMarshaller(JobSchedulerStoreImpl store) {
this.store = store;
}
public MetaData readPayload(DataInput dataIn) throws IOException {
@@ -151,8 +153,8 @@ public class JobSchedulerStore extends S
}
class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
- private final JobSchedulerStore store;
- JobSchedulerMarshaller(JobSchedulerStore store) {
+ private final JobSchedulerStoreImpl store;
+ JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
this.store = store;
}
public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
@@ -166,14 +168,17 @@ public class JobSchedulerStore extends S
}
}
+ @Override
public File getDirectory() {
return directory;
}
+ @Override
public void setDirectory(File directory) {
this.directory = directory;
}
+ @Override
public long size() {
if ( !isStarted() ) {
return 0;
@@ -185,6 +190,7 @@ public class JobSchedulerStore extends S
}
}
+ @Override
public JobScheduler getJobScheduler(final String name) throws Exception {
JobSchedulerImpl result = this.schedulers.get(name);
if (result == null) {
@@ -207,6 +213,7 @@ public class JobSchedulerStore extends S
return result;
}
+ @Override
synchronized public boolean removeJobScheduler(final String name) throws Exception {
boolean result = false;
final JobSchedulerImpl js = this.schedulers.remove(name);
@@ -260,7 +267,7 @@ public class JobSchedulerStore extends S
try {
js.start();
} catch (Exception e) {
- JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
+ JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(),e);
}
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java Tue Nov 6 22:05:06 2012
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.scheduler;
import junit.framework.TestCase;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ByteSequence;
@@ -27,7 +28,7 @@ import java.util.List;
public class JobSchedulerStoreTest extends TestCase {
public void testRestart() throws Exception {
- JobSchedulerStore store = new JobSchedulerStore();
+ JobSchedulerStore store = new JobSchedulerStoreImpl();
File directory = new File("target/test/ScheduledDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java Tue Nov 6 22:05:06 2012
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ByteSequence;
import org.junit.After;
@@ -240,7 +241,7 @@ public class JobSchedulerTest {
}
protected void startStore(File directory) throws Exception {
- store = new JobSchedulerStore();
+ store = new JobSchedulerStoreImpl();
store.setDirectory(directory);
store.start();
scheduler = store.getJobScheduler("test");