You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/04/30 18:11:53 UTC
svn commit: r1477709 - in /activemq/trunk:
activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/
activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/
activemq-leveldb-store/src/test/java/org/apache/activemq/le...
Author: chirino
Date: Tue Apr 30 16:11:52 2013
New Revision: 1477709
URL: http://svn.apache.org/r1477709
Log:
Get the replicatedLevelDB element working in the activemq standalone config.
Added:
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
activemq/trunk/assembly/src/main/descriptors/common-bin.xml
Added: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java?rev=1477709&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java Tue Apr 30 16:11:52 2013
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.leveldb;
+
+import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore;
+
+
+/**
+ * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
+ * LevelDB - Embedded Lightweight Non-Relational Database
+ *
+ * @org.apache.xbean.XBean element="replicatedLevelDB"
+ *
+ */
+public class ReplicatedLevelDBPersistenceAdapter extends ElectingLevelDBStore {
+}
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1477709&r1=1477708&r2=1477709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala Tue Apr 30 16:11:52 2013
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.activemq.leveldb.util.Log
import java.io.File
+import org.apache.activemq.usage.SystemUsage
object ElectingLevelDBStore extends Log {
@@ -58,7 +59,7 @@ class ElectingLevelDBStore extends Proxy
def proxy_target = master
@BeanProperty
- var zkAddress = "tcp://127.0.0.1:2888"
+ var zkAddress = "127.0.0.1:2181"
@BeanProperty
var zkPassword:String = _
@BeanProperty
@@ -72,8 +73,12 @@ class ElectingLevelDBStore extends Proxy
var hostname: String = _
@BeanProperty
var bind = "tcp://0.0.0.0:61619"
+
@BeanProperty
- var minReplica = 1
+ var replicas = 2
+
+ def clusterSizeQuorum = (replicas/2) + 1
+
@BeanProperty
var securityToken = ""
@@ -116,10 +121,6 @@ class ElectingLevelDBStore extends Proxy
@BeanProperty
var monitorStats = false
- def cluster_size_quorum = minReplica + 1
-
- def cluster_size_max = (minReplica << 2) + 1
-
var master: MasterLevelDBStore = _
var slave: SlaveLevelDBStore = _
@@ -129,6 +130,11 @@ class ElectingLevelDBStore extends Proxy
var position: Long = -1L
+ var usageManager: SystemUsage = _
+ override def setUsageManager(usageManager: SystemUsage) {
+ this.usageManager = usageManager
+ }
+
def init() {
// Figure out our position in the store.
@@ -250,7 +256,7 @@ class ElectingLevelDBStore extends Proxy
def create_master() = {
val master = new MasterLevelDBStore
configure(master)
- master.minReplica = minReplica
+ master.replicas = replicas
master.bind = bind
master
}
@@ -278,6 +284,7 @@ class ElectingLevelDBStore extends Proxy
store.securityToken = securityToken
store.setBrokerName(brokerName)
store.setBrokerService(brokerService)
+ store.setUsageManager(usageManager)
}
def address(port: Int) = {
@@ -287,4 +294,21 @@ class ElectingLevelDBStore extends Proxy
"tcp://" + hostname + ":" + port
}
+ override def size: Long = {
+ if( master !=null ) {
+ master.size
+ } else if( slave !=null ) {
+ slave.size
+ } else {
+ var rc = 0L
+ if( directory.exists() ) {
+ for( f <- directory.list() ) {
+ if( f.endsWith(LevelDBClient.LOG_SUFFIX)) {
+ rc += f.length
+ }
+ }
+ }
+ rc
+ }
+ }
}
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala?rev=1477709&r1=1477708&r2=1477709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala Tue Apr 30 16:11:52 2013
@@ -90,7 +90,7 @@ class MasterElector(store: ElectingLevel
info("Not enough cluster members connected to elect a new master.")
case Some(members) =>
- if (members.size < store.cluster_size_quorum) {
+ if (members.size < store.clusterSizeQuorum) {
info("Not enough cluster members connected to elect a new master.")
} else {
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1477709&r1=1477708&r2=1477709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala Tue Apr 30 16:11:52 2013
@@ -43,8 +43,10 @@ class MasterLevelDBStore extends LevelDB
@BeanProperty
var bind = "tcp://0.0.0.0:61619"
+
@BeanProperty
- var minReplica = 1
+ var replicas = 2
+ def minSlaveAcks = replicas/2
val slaves = new ConcurrentHashMap[String,SlaveState]()
@@ -288,10 +290,10 @@ class MasterLevelDBStore extends LevelDB
var position_sync = new PositionSync(0L, 0)
def wal_sync_to(position:Long):Unit = {
- if( minReplica<1 ) {
+ if( minSlaveAcks<1 ) {
return
}
- val position_sync = new PositionSync(position, minReplica)
+ val position_sync = new PositionSync(position, minSlaveAcks)
this.position_sync = position_sync
for( slave <- slaves.values() ) {
slave.check_position_sync
@@ -299,7 +301,7 @@ class MasterLevelDBStore extends LevelDB
while( !position_sync.await(1, TimeUnit.SECONDS) ) {
val status = slaves.values().map(_.status).mkString(", ")
- warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minReplica, position, status)
+ warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minSlaveAcks, position, status)
}
}
Modified: activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java?rev=1477709&r1=1477708&r2=1477709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java Tue Apr 30 16:11:52 2013
@@ -215,7 +215,7 @@ public class ElectingLevelDBStoreTest ex
ElectingLevelDBStore store = new ElectingLevelDBStore();
store.setSecurityToken("foo");
store.setLogSize(1023 * 200);
- store.setMinReplica(1);
+ store.setReplicas(2);
store.setZkAddress("localhost:" + connector.getLocalPort());
store.setZkPath("/broker-stores");
store.setBrokerName("foo");
Modified: activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java?rev=1477709&r1=1477708&r2=1477709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java Tue Apr 30 16:11:52 2013
@@ -17,24 +17,17 @@
package org.apache.activemq.leveldb.test;
import junit.framework.TestCase;
-import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
import org.apache.activemq.leveldb.CountDownFuture;
import org.apache.activemq.leveldb.LevelDBStore;
import org.apache.activemq.leveldb.replicated.MasterLevelDBStore;
import org.apache.activemq.leveldb.replicated.SlaveLevelDBStore;
import org.apache.activemq.leveldb.util.FileSupport;
-import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.JMSException;
import java.io.File;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
@@ -53,7 +46,7 @@ public class ReplicatedLevelDBStoreTest
FileSupport.toRichFile(slaveDir).recursiveDelete();
MasterLevelDBStore master = createMaster(masterDir);
- master.setMinReplica(1);
+ master.setReplicas(2);
master.start();
MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
@@ -172,7 +165,7 @@ public class ReplicatedLevelDBStoreTest
master.setDirectory(directory);
master.setBind("tcp://0.0.0.0:0");
master.setSecurityToken("foo");
- master.setMinReplica(1);
+ master.setReplicas(2);
master.setLogSize(1023 * 200);
return master;
}
Modified: activemq/trunk/assembly/src/main/descriptors/common-bin.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/main/descriptors/common-bin.xml?rev=1477709&r1=1477708&r2=1477709&view=diff
==============================================================================
--- activemq/trunk/assembly/src/main/descriptors/common-bin.xml (original)
+++ activemq/trunk/assembly/src/main/descriptors/common-bin.xml Tue Apr 30 16:11:52 2013
@@ -194,6 +194,20 @@
<include>org.apache.qpid:proton</include>
<include>org.apache.qpid:proton-api</include>
<include>org.apache.qpid:proton-jms</include>
+
+ <!-- activemq-leveldb-store dependencies -->
+ <include>org.scala-lang:scala-library</include>
+ <include>org.fusesource.hawtbuf:hawtbuf-proto</include>
+ <include>org.fusesource.hawtdispatch:*</include>
+ <include>org.iq80.leveldb:*</include>
+ <include>com.google.guava:guava</include>
+ <include>org.fusesource.leveldbjni:*</include>
+ <include>org.fusesource.hawtjni:hawtjni-runtime</include>
+ <include>org.xerial.snappy:*</include>
+ <include>org.iq80.snappy:*</include>
+ <include>org.codehaus.jackson:*</include>
+ <include>org.fusesource.fabric:*</include>
+
</includes>
</dependencySet>
<dependencySet>