You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/04/30 18:11:53 UTC

svn commit: r1477709 - in /activemq/trunk: activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ activemq-leveldb-store/src/test/java/org/apache/activemq/le...

Author: chirino
Date: Tue Apr 30 16:11:52 2013
New Revision: 1477709

URL: http://svn.apache.org/r1477709
Log:
Get the replicatedLevelDB element working in the activemq standalone config.

Added:
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java
Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
    activemq/trunk/assembly/src/main/descriptors/common-bin.xml

Added: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java?rev=1477709&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/store/leveldb/ReplicatedLevelDBPersistenceAdapter.java Tue Apr 30 16:11:52 2013
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.leveldb;
+
+import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore;
+
+
+/**
+ * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
+ * LevelDB - Embedded Lightweight Non-Relational Database
+ *
+ * @org.apache.xbean.XBean element="replicatedLevelDB"
+ *
+ */
+public class ReplicatedLevelDBPersistenceAdapter extends ElectingLevelDBStore {
+}

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1477709&r1=1477708&r2=1477709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala Tue Apr 30 16:11:52 2013
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.atomic.AtomicBoolean
 import org.apache.activemq.leveldb.util.Log
 import java.io.File
+import org.apache.activemq.usage.SystemUsage
 
 object ElectingLevelDBStore extends Log {
 
@@ -58,7 +59,7 @@ class ElectingLevelDBStore extends Proxy
   def proxy_target = master
 
   @BeanProperty
-  var zkAddress = "tcp://127.0.0.1:2888"
+  var zkAddress = "127.0.0.1:2181"
   @BeanProperty
   var zkPassword:String = _
   @BeanProperty
@@ -72,8 +73,12 @@ class ElectingLevelDBStore extends Proxy
   var hostname: String = _
   @BeanProperty
   var bind = "tcp://0.0.0.0:61619"
+
   @BeanProperty
-  var minReplica = 1
+  var replicas = 2
+
+  def clusterSizeQuorum = (replicas/2) + 1
+
   @BeanProperty
   var securityToken = ""
 
@@ -116,10 +121,6 @@ class ElectingLevelDBStore extends Proxy
   @BeanProperty
   var monitorStats = false
 
-  def cluster_size_quorum = minReplica + 1
-
-  def cluster_size_max = (minReplica << 2) + 1
-
   var master: MasterLevelDBStore = _
   var slave: SlaveLevelDBStore = _
 
@@ -129,6 +130,11 @@ class ElectingLevelDBStore extends Proxy
 
   var position: Long = -1L
 
+  var usageManager: SystemUsage = _
+  override def setUsageManager(usageManager: SystemUsage) {
+    this.usageManager = usageManager
+  }
+
   def init() {
 
     // Figure out our position in the store.
@@ -250,7 +256,7 @@ class ElectingLevelDBStore extends Proxy
   def create_master() = {
     val master = new MasterLevelDBStore
     configure(master)
-    master.minReplica = minReplica
+    master.replicas = replicas
     master.bind = bind
     master
   }
@@ -278,6 +284,7 @@ class ElectingLevelDBStore extends Proxy
     store.securityToken = securityToken
     store.setBrokerName(brokerName)
     store.setBrokerService(brokerService)
+    store.setUsageManager(usageManager)
   }
 
   def address(port: Int) = {
@@ -287,4 +294,21 @@ class ElectingLevelDBStore extends Proxy
     "tcp://" + hostname + ":" + port
   }
 
+  override def size: Long = {
+    if( master !=null ) {
+      master.size
+    } else if( slave !=null ) {
+      slave.size
+    } else {
+      var rc = 0L
+      if( directory.exists() ) {
+        for( f <- directory.list() ) {
+          if( f.endsWith(LevelDBClient.LOG_SUFFIX)) {
+            rc += f.length
+          }
+        }
+      }
+      rc
+    }
+  }
 }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala?rev=1477709&r1=1477708&r2=1477709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala Tue Apr 30 16:11:52 2013
@@ -90,7 +90,7 @@ class MasterElector(store: ElectingLevel
             info("Not enough cluster members connected to elect a new master.")
           case Some(members) =>
 
-            if (members.size < store.cluster_size_quorum) {
+            if (members.size < store.clusterSizeQuorum) {
               info("Not enough cluster members connected to elect a new master.")
             } else {
 

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1477709&r1=1477708&r2=1477709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala Tue Apr 30 16:11:52 2013
@@ -43,8 +43,10 @@ class MasterLevelDBStore extends LevelDB
 
   @BeanProperty
   var bind = "tcp://0.0.0.0:61619"
+
   @BeanProperty
-  var minReplica = 1
+  var replicas = 2
+  def minSlaveAcks = replicas/2
 
   val slaves = new ConcurrentHashMap[String,SlaveState]()
 
@@ -288,10 +290,10 @@ class MasterLevelDBStore extends LevelDB
   var position_sync = new PositionSync(0L, 0)
 
   def wal_sync_to(position:Long):Unit = {
-    if( minReplica<1 ) {
+    if( minSlaveAcks<1 ) {
       return
     }
-    val position_sync = new PositionSync(position, minReplica)
+    val position_sync = new PositionSync(position, minSlaveAcks)
     this.position_sync = position_sync
     for( slave <- slaves.values() ) {
       slave.check_position_sync
@@ -299,7 +301,7 @@ class MasterLevelDBStore extends LevelDB
 
     while( !position_sync.await(1, TimeUnit.SECONDS) ) {
       val status = slaves.values().map(_.status).mkString(", ")
-      warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minReplica, position, status)
+      warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minSlaveAcks, position, status)
     }
   }
 

Modified: activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java?rev=1477709&r1=1477708&r2=1477709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java Tue Apr 30 16:11:52 2013
@@ -215,7 +215,7 @@ public class ElectingLevelDBStoreTest ex
         ElectingLevelDBStore store = new ElectingLevelDBStore();
         store.setSecurityToken("foo");
         store.setLogSize(1023 * 200);
-        store.setMinReplica(1);
+        store.setReplicas(2);
         store.setZkAddress("localhost:" + connector.getLocalPort());
         store.setZkPath("/broker-stores");
         store.setBrokerName("foo");

Modified: activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java?rev=1477709&r1=1477708&r2=1477709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java Tue Apr 30 16:11:52 2013
@@ -17,24 +17,17 @@
 package org.apache.activemq.leveldb.test;
 
 import junit.framework.TestCase;
-import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
 import org.apache.activemq.leveldb.CountDownFuture;
 import org.apache.activemq.leveldb.LevelDBStore;
 import org.apache.activemq.leveldb.replicated.MasterLevelDBStore;
 import org.apache.activemq.leveldb.replicated.SlaveLevelDBStore;
 import org.apache.activemq.leveldb.util.FileSupport;
-import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.JMSException;
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.concurrent.TimeUnit;
@@ -53,7 +46,7 @@ public class ReplicatedLevelDBStoreTest 
         FileSupport.toRichFile(slaveDir).recursiveDelete();
 
         MasterLevelDBStore master = createMaster(masterDir);
-        master.setMinReplica(1);
+        master.setReplicas(2);
         master.start();
 
         MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
@@ -172,7 +165,7 @@ public class ReplicatedLevelDBStoreTest 
         master.setDirectory(directory);
         master.setBind("tcp://0.0.0.0:0");
         master.setSecurityToken("foo");
-        master.setMinReplica(1);
+        master.setReplicas(2);
         master.setLogSize(1023 * 200);
         return master;
     }

Modified: activemq/trunk/assembly/src/main/descriptors/common-bin.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/main/descriptors/common-bin.xml?rev=1477709&r1=1477708&r2=1477709&view=diff
==============================================================================
--- activemq/trunk/assembly/src/main/descriptors/common-bin.xml (original)
+++ activemq/trunk/assembly/src/main/descriptors/common-bin.xml Tue Apr 30 16:11:52 2013
@@ -194,6 +194,20 @@
         <include>org.apache.qpid:proton</include>
         <include>org.apache.qpid:proton-api</include>
         <include>org.apache.qpid:proton-jms</include>
+        
+        <!-- activemq-leveldb-store dependencies -->
+        <include>org.scala-lang:scala-library</include>
+        <include>org.fusesource.hawtbuf:hawtbuf-proto</include>
+        <include>org.fusesource.hawtdispatch:*</include>
+        <include>org.iq80.leveldb:*</include>
+        <include>com.google.guava:guava</include>
+        <include>org.fusesource.leveldbjni:*</include>
+        <include>org.fusesource.hawtjni:hawtjni-runtime</include>
+        <include>org.xerial.snappy:*</include>
+        <include>org.iq80.snappy:*</include>
+        <include>org.codehaus.jackson:*</include>
+        <include>org.fusesource.fabric:*</include>
+
       </includes>
     </dependencySet>
     <dependencySet>