You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/11 20:40:02 UTC

svn commit: r889781 [1/3] - in /activemq/sandbox/activemq-apollo/activemq-kahadb-replication: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/kahadb/ src/main/java/org/apache/kahadb/replication/ sr...

Author: chirino
Date: Fri Dec 11 19:39:58 2009
New Revision: 889781

URL: http://svn.apache.org/viewvc?rev=889781&view=rev
Log:
resurecting the kaahdb-replication idea.. goign to try to work on in it for the next couple of weeks.  Fingers crossed I can get it working.


Added:
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/README
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/pom.xml   (with props)
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterListener.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterState.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationService.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/blaze/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/zk/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/proto/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/proto/kahadb-replication.proto
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha-broker.xml
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha.xml
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/transport/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/wireformat/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/eclipse-resources/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/eclipse-resources/log4j.properties
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/blaze/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/transport/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/zk/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/ha-broker.xml
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/ha.xml
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/ha-broker.xml
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/ha.xml
    activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/log4j.properties

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/README
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/README?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/README (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/README Fri Dec 11 19:39:58 2009
@@ -0,0 +1,18 @@
+=======================================================================
+ ActiveMQ KahaDB Replication: HA replication for the KahaDB store
+=======================================================================
+
+Goals:
+  * Provide fast replication of the KahaDB Store
+  * Support multiple replication slaves
+  * Support dynamically adding new replication slaves to a running master
+  * Support multiple master/slave selection strategies
+  
+Status:
+  * ZooKeeper based master/slave selection strategies implemented
+    TODO: May need to have master watch for ZooKeeper disconnect 
+          to force the master to go offline.
+  * A pure Broadcast/Multicast master/slave selection strategy
+    implemented using ActiveBlaze.
+    TODO: validate that split brain cannot occur.
+    
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/pom.xml?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/pom.xml (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/pom.xml Fri Dec 11 19:39:58 2009
@@ -0,0 +1,188 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>activemq-parent</artifactId>
+    <version>5.3.0</version>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.activemq</groupId>
+  <artifactId>activemq-kahadb-replication</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <version>1.1</version>
+      <exclusions>
+        <exclusion>
+          <groupId>avalon-framework</groupId>
+          <artifactId>avalon-framework</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>logkit</groupId>
+          <artifactId>logkit</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.14</version>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.apache.activemq.protobuf</groupId>
+      <artifactId>activemq-protobuf</artifactId>
+    </dependency>  
+          
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.xbean</groupId>
+      <artifactId>xbean-spring</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-core</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-beans</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-context</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <optional>true</optional>
+    </dependency>
+        
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activeblaze</artifactId>
+      <version>1.0-SNAPSHOT</version>
+      <optional>true</optional>
+    </dependency>
+     
+  </dependencies>
+  
+  <repositories>
+    <repository>
+      <id>chirino-zk-repo</id>
+      <name>Private ZooKeeper Repo</name>
+      <url>http://people.apache.org/~chirino/zk-repo/</url>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.xbean</groupId>
+        <artifactId>maven-xbean-plugin</artifactId>
+        <version>3.4</version>
+        <executions>
+          <execution>
+            <configuration>
+              <namespace>http://activemq.apache.org/schema/kahadb</namespace>
+            </configuration>
+            <goals>
+              <goal>mapping</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.5</source>
+          <target>1.5</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.activemq.protobuf</groupId>
+        <artifactId>activemq-protobuf</artifactId>
+         <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkMode>pertest</forkMode>
+          <childDelegation>false</childDelegation>
+          <useFile>true</useFile>
+          <argLine>-Xmx512M</argLine>
+
+           <systemProperties>
+            <property>
+              <name>org.apache.activemq.default.directory.prefix</name>
+              <value>target/</value>
+            </property>
+          </systemProperties>
+
+          <includes>
+            <include>**/*Test.*</include>
+          </includes>
+          <excludes>
+            <exclude>**/perf/*</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Propchange: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/pom.xml
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterListener.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterListener.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterListener.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+public interface ClusterListener {
+	
+	public void onClusterChange(ClusterState config);
+
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterState.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterState.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterState.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ClusterState {
+
+	private List<String> slaves = new ArrayList<String>();
+	private String master;
+	
+	public List<String> getSlaves() {
+		return slaves;
+	}
+	public void setSlaves(List<String> slaves) {
+		this.slaves = slaves;
+	}
+	public String getMaster() {
+		return master;
+	}
+	public void setMaster(String master) {
+		this.master = master;
+	}
+	
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import org.apache.activemq.Service;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+
+/**
+ * This interface is used by the ReplicationService to know when
+ * it should switch between Slave and Master mode. 
+ * 
+ * @author chirino
+ */
+public interface ClusterStateManager extends Service {
+
+    /**
+     * Adds a ClusterListener which is used to get notifications
+     * of chagnes in the cluster state.
+     * @param listener
+     */
+	void addListener(ClusterListener listener);
+	
+	/**
+	 * Removes a previously added ClusterListener
+	 * @param listener
+	 */
+	void removeListener(ClusterListener listener);
+
+	/**
+	 * Adds a member to the cluster.  Adding a member does not mean he is online.
+	 * Some ClusterStateManager may keep track of a persistent memebership list
+	 * so that can determine if there are enough nodes online to form a quorum
+	 * for the purposes of electing a master.
+	 * 
+	 * @param node
+	 */
+    public void addMember(final String node);
+    
+    /**
+     * Removes a previously added member.
+     * 
+     * @param node
+     */
+    public void removeMember(final String node);
+
+    /**
+     * Updates the status of the local node.
+     * 
+     * @param status
+     */
+    public void setMemberStatus(final PBClusterNodeStatus status);
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.BrokerService;
+
+/**
+ * This broker service actually does not do anything.  It allows you to create an activemq.xml file
+ * which does not actually start a broker.  Used in conjunction with the ReplicationService since
+ * he will create the actual BrokerService
+ * 
+ * @author chirino
+ * @org.apache.xbean.XBean element="kahadbReplicationBroker"
+ */
+public class ReplicationBrokerService extends BrokerService {
+
+    ReplicationService replicationService;
+    AtomicBoolean started = new AtomicBoolean();
+
+    public ReplicationService getReplicationService() {
+        return replicationService;
+    }
+
+    public void setReplicationService(ReplicationService replicationService) {
+        this.replicationService = replicationService;
+    }
+    
+    @Override
+    public void start() throws Exception {
+        if( started.compareAndSet(false, true) ) {
+            replicationService.start();
+        }
+    }
+    
+    @Override
+    public void stop() throws Exception {
+        if( started.compareAndSet(true, false) ) {
+            replicationService.stop();
+        }
+    }
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import org.apache.kahadb.replication.pb.PBHeader;
+
+public class ReplicationFrame {
+	
+	PBHeader header;
+	Object payload;
+	
+	public PBHeader getHeader() {
+		return header;
+	}
+	public void setHeader(PBHeader header) {
+		this.header = header;
+	}
+	
+	public Object getPayload() {
+		return payload;
+	}
+	public void setPayload(Object payload) {
+		this.payload = payload;
+	}
+	
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.util.Callback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.DataFile;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.journal.ReplicationTarget;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
+import org.apache.kahadb.replication.pb.PBType;
+import org.apache.kahadb.util.ByteSequence;
+
+
+public class ReplicationMaster implements Service, ClusterListener, ReplicationTarget {
+
+    private static final Log LOG = LogFactory.getLog(ReplicationService.class);
+
+    private final ReplicationService replicationService;
+
+    private Object serverMutex = new Object() {
+    };
+    private TransportServer server;
+
+    private ArrayList<ReplicationSession> sessions = new ArrayList<ReplicationSession>();
+
+    private final AtomicInteger nextSnapshotId = new AtomicInteger();
+    private final Object requestMutex = new Object(){};
+    private Location requestLocation;
+    private CountDownLatch requestLatch;
+    private int minimumReplicas;
+    
+    public ReplicationMaster(ReplicationService replicationService) {
+        this.replicationService = replicationService;
+        minimumReplicas = replicationService.getMinimumReplicas();
+    }
+
+    public void start() throws Exception {
+        synchronized (serverMutex) {
+            server = TransportFactory.bind(new URI(replicationService.getUri()));
+            server.setAcceptListener(new TransportAcceptListener() {
+                public void onAccept(Transport transport) {
+                    try {
+                        synchronized (serverMutex) {
+                            ReplicationSession session = new ReplicationSession(transport);
+                            session.start();
+                            addSession(session);
+                        }
+                    } catch (Exception e) {
+                        LOG.info("Could not accept replication connection from slave at " + transport.getRemoteAddress() + ", due to: " + e, e);
+                    }
+                }
+
+                public void onAcceptError(Exception e) {
+                    LOG.info("Could not accept replication connection: " + e, e);
+                }
+            });
+            server.start();
+        }
+        replicationService.getStore().getJournal().setReplicationTarget(this);
+    }
+
+    boolean isStarted() {
+        synchronized (serverMutex) {
+            return server != null;
+        }
+    }
+
+    public void stop() throws Exception {
+        replicationService.getStore().getJournal().setReplicationTarget(null);
+        synchronized (serverMutex) {
+            if (server != null) {
+                server.stop();
+                server = null;
+            }
+        }
+
+        ArrayList<ReplicationSession> sessionsSnapshot;
+        synchronized (this.sessions) {
+            sessionsSnapshot = this.sessions;
+        }
+
+        for (ReplicationSession session : sessionsSnapshot) {
+            session.stop();
+        }
+    }
+
+    protected void addSession(ReplicationSession session) {
+        synchronized (sessions) {
+            sessions = new ArrayList<ReplicationSession>(sessions);
+            sessions.add(session);
+        }
+    }
+
+    protected void removeSession(ReplicationSession session) {
+        synchronized (sessions) {
+            sessions = new ArrayList<ReplicationSession>(sessions);
+            sessions.remove(session);
+        }
+    }
+
+    public void onClusterChange(ClusterState config) {
+        // For now, we don't really care about changes in the slave config..
+    }
+
+    /**
+     * This is called by the Journal so that we can replicate the update to the
+     * slaves.
+     */
+    public void replicate(Location location, ByteSequence sequence, boolean sync) {
+        ArrayList<ReplicationSession> sessionsSnapshot;
+        synchronized (this.sessions) {
+            // Hurrah for copy on write..
+            sessionsSnapshot = this.sessions;
+        }
+
+        // We may be able to always async replicate...
+        if (minimumReplicas==0) {
+            sync = false;
+        }
+        CountDownLatch latch = null;
+        if (sync) {
+            latch = new CountDownLatch(minimumReplicas);
+            synchronized (requestMutex) {
+                requestLatch = latch;
+                requestLocation = location;
+            }
+        }
+
+        ReplicationFrame frame = null;
+        for (ReplicationSession session : sessionsSnapshot) {
+            if (session.subscribedToJournalUpdates.get()) {
+
+                // Lazy create the frame since we may have not avilable sessions
+                // to send to.
+                if (frame == null) {
+                    frame = new ReplicationFrame();
+                    frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
+                    PBJournalUpdate payload = new PBJournalUpdate();
+                    payload.setLocation(ReplicationSupport.convert(location));
+                    payload.setData(new org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), sequence.getLength()));
+                    payload.setSendAck(sync);
+                    frame.setPayload(payload);
+                }
+
+                // TODO: use async send threads so that the frames can be pushed
+                // out in parallel.
+                try {
+                    session.setLastUpdateLocation(location);
+                    session.transport.oneway(frame);
+                } catch (IOException e) {
+                    session.onException(e);
+                }
+            }
+        }
+
+        if (sync) {
+            try {
+                int timeout = 500;
+                int counter = 0;
+                while (true) {
+                    if (latch.await(timeout, TimeUnit.MILLISECONDS)) {
+                        return;
+                    }
+                    if (!isStarted()) {
+                        return;
+                    }
+                    counter++;
+                    if ((counter % 10) == 0) {
+                        LOG.warn("KahaDB is waiting for slave to come online. " + (timeout * counter / 1000.f) + " seconds have elapsed.");
+                    }
+                }
+            } catch (InterruptedException ignore) {
+            }
+        }
+
+    }
+
+    private void ackAllFromTo(Location lastAck, Location newAck) {
+        Location l;
+        java.util.concurrent.CountDownLatch latch;
+        synchronized (requestMutex) {
+            latch = requestLatch;
+            l = requestLocation;
+        }
+        if( l == null ) {
+            return;
+        }
+        
+        if (lastAck == null || lastAck.compareTo(l) < 0) {
+            if (newAck != null && l.compareTo(newAck) <= 0) {
+                latch.countDown();
+                return;
+            }
+        } 
+    }
+
+    class ReplicationSession implements Service, TransportListener {
+
+        private final Transport transport;
+        private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean();
+        private boolean stopped;
+
+        private File snapshotFile;
+        private HashSet<Integer> journalReplicatedFiles;
+        private Location lastAckLocation;
+        private Location lastUpdateLocation;
+        private boolean online;
+
+        public ReplicationSession(Transport transport) {
+            this.transport = transport;
+        }
+
+        synchronized public void setLastUpdateLocation(Location lastUpdateLocation) {
+            this.lastUpdateLocation = lastUpdateLocation;
+        }
+
+        public void start() throws Exception {
+            transport.setTransportListener(this);
+            transport.start();
+        }
+
+        synchronized public void stop() throws Exception {
+            if (!stopped) {
+                stopped = true;
+                deleteReplicationData();
+                transport.stop();
+            }
+        }
+
+        synchronized private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation location) {
+            Location ack = ReplicationSupport.convert(location);
+            if (online) {
+                ackAllFromTo(lastAckLocation, ack);
+            }
+            lastAckLocation = ack;
+        }
+
+        synchronized private void onSlaveOnline(ReplicationFrame frame) {
+            deleteReplicationData();
+            online = true;
+            if (lastAckLocation != null) {
+                ackAllFromTo(null, lastAckLocation);
+            }
+
+        }
+
+        public void onCommand(Object command) {
+            try {
+                ReplicationFrame frame = (ReplicationFrame)command;
+                switch (frame.getHeader().getType()) {
+                case SLAVE_INIT:
+                    onSlaveInit(frame, (PBSlaveInit)frame.getPayload());
+                    break;
+                case SLAVE_ONLINE:
+                    onSlaveOnline(frame);
+                    break;
+                case FILE_TRANSFER:
+                    onFileTransfer(frame, (PBFileInfo)frame.getPayload());
+                    break;
+                case JOURNAL_UPDATE_ACK:
+                    onJournalUpdateAck(frame, (PBJournalLocation)frame.getPayload());
+                    break;
+                }
+            } catch (Exception e) {
+                LOG.warn("Slave request failed: " + e, e);
+                failed(e);
+            }
+        }
+
+        public void onException(IOException error) {
+            failed(error);
+        }
+
+        public void failed(Exception error) {
+            try {
+                stop();
+            } catch (Exception ignore) {
+            }
+        }
+
+        public void transportInterupted() {
+        }
+
+        public void transportResumed() {
+        }
+
+        private void deleteReplicationData() {
+            if (snapshotFile != null) {
+                snapshotFile.delete();
+                snapshotFile = null;
+            }
+            if (journalReplicatedFiles != null) {
+                journalReplicatedFiles = null;
+                updateJournalReplicatedFiles();
+            }
+        }
+
+        private void onSlaveInit(ReplicationFrame frame, PBSlaveInit slaveInit) throws Exception {
+
+            // Start sending journal updates to the slave.
+            subscribedToJournalUpdates.set(true);
+
+            // We could look at the slave state sent in the slaveInit and decide
+            // that a full sync is not needed..
+            // but for now we will do a full sync every time.
+            ReplicationFrame rc = new ReplicationFrame();
+            final PBSlaveInitResponse rcPayload = new PBSlaveInitResponse();
+            rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
+            rc.setPayload(rcPayload);
+
+            // Setup a map of all the files that the slave has
+            final HashMap<String, PBFileInfo> slaveFiles = new HashMap<String, PBFileInfo>();
+            for (PBFileInfo info : slaveInit.getCurrentFilesList()) {
+                slaveFiles.put(info.getName(), info);
+            }
+
+            final KahaDBStore store = replicationService.getStore();
+            store.checkpoint(new Callback() {
+                public void execute() throws Exception {
+                    // This call back is executed once the checkpoint is
+                    // completed and all data has been synced to disk,
+                    // but while a lock is still held on the store so
+                    // that no updates are done while we are in this
+                    // method.
+
+                    KahaDBStore store = replicationService.getStore();
+                    if (lastAckLocation == null) {
+                        lastAckLocation = store.getLastUpdatePosition();
+                    }
+
+                    int snapshotId = nextSnapshotId.incrementAndGet();
+                    File file = store.getPageFile().getFile();
+                    File dir = replicationService.getTempReplicationDir();
+                    dir.mkdirs();
+                    snapshotFile = new File(dir, "snapshot-" + snapshotId);
+
+                    journalReplicatedFiles = new HashSet<Integer>();
+
+                    // Store the list files associated with the snapshot.
+                    ArrayList<PBFileInfo> snapshotInfos = new ArrayList<PBFileInfo>();
+                    Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
+                    for (DataFile df : journalFiles.values()) {
+                        // Look at what the slave has so that only the missing
+                        // bits are transfered.
+                        String name = "journal-" + df.getDataFileId();
+                        PBFileInfo slaveInfo = slaveFiles.remove(name);
+
+                        // Use the checksum info to see if the slave has the
+                        // file already.. Checksums are less acurrate for
+                        // small amounts of data.. so ignore small files.
+                        if (slaveInfo != null && slaveInfo.getEnd() > 1024 * 512) {
+                            // If the slave's file checksum matches what we
+                            // have..
+                            if (ReplicationSupport.checksum(df.getFile(), 0, slaveInfo.getEnd()) == slaveInfo.getChecksum()) {
+                                // is Our file longer? then we need to continue
+                                // transferring the rest of the file.
+                                if (df.getLength() > slaveInfo.getEnd()) {
+                                    snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), slaveInfo.getEnd(), df.getLength()));
+                                    journalReplicatedFiles.add(df.getDataFileId());
+                                    continue;
+                                } else {
+                                    // No need to replicate this file.
+                                    continue;
+                                }
+                            }
+                        }
+
+                        // If we got here then it means we need to transfer the
+                        // whole file.
+                        snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), 0, df.getLength()));
+                        journalReplicatedFiles.add(df.getDataFileId());
+                    }
+
+                    PBFileInfo info = new PBFileInfo();
+                    info.setName("database");
+                    info.setSnapshotId(snapshotId);
+                    info.setStart(0);
+                    info.setEnd(file.length());
+                    info.setChecksum(ReplicationSupport.copyAndChecksum(file, snapshotFile));
+                    snapshotInfos.add(info);
+
+                    rcPayload.setCopyFilesList(snapshotInfos);
+                    ArrayList<String> deleteFiles = new ArrayList<String>();
+                    slaveFiles.remove("database");
+                    for (PBFileInfo unused : slaveFiles.values()) {
+                        deleteFiles.add(unused.getName());
+                    }
+                    rcPayload.setDeleteFilesList(deleteFiles);
+
+                    updateJournalReplicatedFiles();
+                }
+
+            });
+
+            transport.oneway(rc);
+        }
+
+        private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException {
+            File file = replicationService.getReplicationFile(fileInfo.getName());
+            long payloadSize = fileInfo.getEnd() - fileInfo.getStart();
+
+            if (file.length() < fileInfo.getStart() + payloadSize) {
+                throw new IOException("Requested replication file dose not have enough data.");
+            }
+
+            ReplicationFrame rc = new ReplicationFrame();
+            rc.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(payloadSize));
+
+            FileInputStream is = new FileInputStream(file);
+            rc.setPayload(is);
+            try {
+                is.skip(fileInfo.getStart());
+                transport.oneway(rc);
+            } finally {
+                try {
+                    is.close();
+                } catch (Throwable e) {
+                }
+            }
+        }
+
+    }
+
+    /**
+     * Looks at all the journal files being currently replicated and informs the
+     * KahaDB so that it does not delete them while the replication is occuring.
+     */
+    private void updateJournalReplicatedFiles() {
+        HashSet<Integer> files = replicationService.getStore().getJournalFilesBeingReplicated();
+        files.clear();
+
+        ArrayList<ReplicationSession> sessionsSnapshot;
+        synchronized (this.sessions) {
+            // Hurrah for copy on write..
+            sessionsSnapshot = this.sessions;
+        }
+
+        for (ReplicationSession session : sessionsSnapshot) {
+            if (session.journalReplicatedFiles != null) {
+                files.addAll(session.journalReplicatedFiles);
+            }
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationService.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationService.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationService.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.IOHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
+
+/**
+ * Handles interfacing with the ClusterStateManager and handles activating the
+ * slave or master facets of the broker.
+ * 
+ * @author chirino
+ * @org.apache.xbean.XBean element="kahadbReplication"
+ */
+public class ReplicationService implements Service, ClusterListener {
+
+    private static final String JOURNAL_PREFIX = "journal-";
+
+    private static final Log LOG = LogFactory.getLog(ReplicationService.class);
+
+    private String brokerURI = "xbean:broker.xml";
+    private File directory = new File(IOHelper.getDefaultDataDirectory());
+    private File tempReplicationDir;
+    private String uri;
+    private ClusterStateManager cluster;
+    private int minimumReplicas=1;
+    
+    private KahaDBStore store;
+
+    private ClusterState clusterState;
+    private BrokerService brokerService;
+    private ReplicationMaster master;
+    private ReplicationSlave slave;
+
+    public void start() throws Exception {
+        if( cluster==null ) {
+            throw new IllegalArgumentException("The cluster field has not been set.");
+        }
+        // The cluster will let us know about the cluster configuration,
+        // which lets us decide if we are going to be a slave or a master.
+        getStore().open();
+        cluster.addListener(this);
+        cluster.start();
+        
+        cluster.addMember(getUri());
+        cluster.setMemberStatus(createStatus(State.SLAVE_UNCONNECTED));
+    }
+
+    public PBClusterNodeStatus createStatus(State state) throws IOException {
+        final PBClusterNodeStatus status = new PBClusterNodeStatus();
+        status.setConnectUri(getUri());
+        status.setLastUpdate(ReplicationSupport.convert(getStore().getLastUpdatePosition()));
+        status.setState(state);
+        return status;
+    }
+
+    public void stop() throws Exception {
+        cluster.removeListener(this);
+        cluster.stop();
+        stopMaster();
+        stopSlave();
+        getStore().close();
+    }
+
+    public void onClusterChange(ClusterState clusterState) {
+        this.clusterState = clusterState;
+        try {
+            synchronized (cluster) {
+                if (areWeTheSlave(clusterState)) {
+                    // If we were the master we need to stop the master
+                    // service..
+                    stopMaster();
+                    // If the slave service was not yet started.. start it up.
+                    if( clusterState.getMaster()==null ) {
+                        stopSlave();
+                    } else {
+                        startSlave();
+                        slave.onClusterChange(clusterState);
+                    }
+                } else if (areWeTheMaster(clusterState)) {
+                    // If we were the slave we need to stop the slave service..
+                    stopSlave();
+                    // If the master service was not yet started.. start it up.
+                    startMaster();
+                    master.onClusterChange(clusterState);
+                } else {
+                    // We were not part of the configuration (not master nor
+                    // slave).
+                    // So we have to shutdown any running master or slave
+                    // services that may
+                    // have been running.
+                    stopMaster();
+                    stopSlave();
+                    getCluster().setMemberStatus(createStatus(State.SLAVE_UNCONNECTED));
+
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Unexpected Error: " + e, e);
+        }
+    }
+
+    private void startMaster() throws IOException, Exception {
+        if (master == null) {
+            LOG.info("Starting replication master.");
+            getCluster().setMemberStatus(createStatus(State.MASTER));
+            brokerService = createBrokerService();
+            brokerService.start();
+            master = new ReplicationMaster(this);
+            master.start();
+        }
+    }
+
+    private void stopSlave() throws Exception {
+        if (slave != null) {
+            LOG.info("Stopping replication slave.");
+            slave.stop();
+            slave = null;
+        }
+    }
+
+    private void startSlave() throws Exception {
+        if (slave == null) {
+            LOG.info("Starting replication slave.");
+            slave = new ReplicationSlave(this);
+            slave.start();
+        }
+    }
+
+    private void stopMaster() throws Exception, IOException {
+        if (master != null) {
+            LOG.info("Stopping replication master.");
+            master.stop();
+            master = null;
+            brokerService.stop();
+            brokerService = null;
+            // Stopping the broker service actually stops the store
+            // too..
+            // so we need to open it back up.
+            getStore().open();
+        }
+    }
+
+    public BrokerService getBrokerService() {
+        return brokerService;
+    }
+
+    private BrokerService createBrokerService() throws Exception {
+        BrokerService rc = BrokerFactory.createBroker(brokerURI);
+        rc.setPersistenceAdapter(getStore());
+        return rc;
+    }
+
+    public ClusterState getClusterState() {
+        return clusterState;
+    }
+
+    private boolean areWeTheSlave(ClusterState config) {
+        return config.getSlaves().contains(uri);
+    }
+
+    private boolean areWeTheMaster(ClusterState config) {
+        return uri.equals(config.getMaster());
+    }
+    
+    ///////////////////////////////////////////////////////////////////
+    // Accessors
+    ///////////////////////////////////////////////////////////////////
+
+    public File getReplicationFile(String fn) throws IOException {
+        if (fn.equals("database")) {
+            return getStore().getPageFile().getFile();
+        }
+        if (fn.startsWith(JOURNAL_PREFIX)) {
+            int id;
+            try {
+                id = Integer.parseInt(fn.substring(JOURNAL_PREFIX.length()));
+            } catch (NumberFormatException e) {
+                throw new IOException("Unknown replication file name: " + fn);
+            }
+            return getStore().getJournal().getFile(id);
+        } else {
+            throw new IOException("Unknown replication file name: " + fn);
+        }
+    }
+
+
+    public File getTempReplicationFile(String fn, int snapshotId) throws IOException {
+        if (fn.equals("database")) {
+            return new File(getTempReplicationDir(), "database-" + snapshotId);
+        }
+        if (fn.startsWith(JOURNAL_PREFIX)) {
+            int id;
+            try {
+                id = Integer.parseInt(fn.substring(JOURNAL_PREFIX.length()));
+            } catch (NumberFormatException e) {
+                throw new IOException("Unknown replication file name: " + fn);
+            }
+            return new File(getTempReplicationDir(), fn);
+        } else {
+            throw new IOException("Unknown replication file name: " + fn);
+        }
+    }
+
+    public boolean isMaster() {
+        return master != null;
+    }
+
+    public File getTempReplicationDir() {
+        if (tempReplicationDir == null) {
+            tempReplicationDir = new File(getStore().getDirectory(), "replication");
+        }
+        return tempReplicationDir;
+    }
+    public void setTempReplicationDir(File tempReplicationDir) {
+        this.tempReplicationDir = tempReplicationDir;
+    }
+
+    public KahaDBStore getStore() {
+        if (store == null) {
+            store = new KahaDBStore();
+            store.setDirectory(directory);
+        }
+        return store;
+    }
+    public void setStore(KahaDBStore store) {
+        this.store = store;
+    }
+
+    public File getDirectory() {
+        return directory;
+    }
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
+    public String getBrokerURI() {
+        return brokerURI;
+    }
+    public void setBrokerURI(String brokerURI) {
+        this.brokerURI = brokerURI;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+    public void setUri(String nodeId) {
+        this.uri = nodeId;
+    }
+    
+    public ClusterStateManager getCluster() {
+        return cluster;
+    }
+    public void setCluster(ClusterStateManager cluster) {
+        this.cluster = cluster;
+    }
+
+    public int getMinimumReplicas() {
+        return minimumReplicas;
+    }
+
+    public void setMinimumReplicas(int minimumReplicas) {
+        this.minimumReplicas = minimumReplicas;
+    }
+
+
+
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,588 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.DataFile;
+import org.apache.kahadb.page.PageFile;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
+import org.apache.kahadb.replication.pb.PBType;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+
+public class ReplicationSlave implements Service, ClusterListener, TransportListener {
+	
+	private static final int MAX_TRANSFER_SESSIONS = 1;
+
+	private static final Log LOG = LogFactory.getLog(ReplicationSlave.class);
+
+	private final ReplicationService replicationServer;
+	private Transport transport;
+
+	// Used to bulk transfer the master state over to the slave..
+	private final Object transferMutex = new Object();
+	private final LinkedList<PBFileInfo> transferQueue = new LinkedList<PBFileInfo>();
+	private final LinkedList<TransferSession> transferSessions = new LinkedList<TransferSession>();
+	private final HashMap<String, PBFileInfo> bulkFiles = new HashMap<String, PBFileInfo>();	
+	private PBSlaveInitResponse initResponse;
+	private boolean online;
+	private final AtomicBoolean started = new AtomicBoolean();
+	
+	// Used to do real time journal updates..
+	int journalUpdateFileId;
+	RandomAccessFile journalUpateFile;
+	private String master;
+	
+	public ReplicationSlave(ReplicationService replicationServer) {
+		this.replicationServer = replicationServer;
+	}
+
+	public void start() throws Exception {
+		if( started.compareAndSet(false, true)) {
+	        onClusterChange(replicationServer.getClusterState());
+
+		}
+	}
+	
+	public void stop() throws Exception {
+		if( started.compareAndSet(true, false)) {
+			doStop();
+		}
+	}
+
+	private void doStart() throws Exception, URISyntaxException, IOException {
+		synchronized (transferMutex) {
+			
+			// Failure recovery might be trying to start us back up,
+			// but the Replication server may have already stopped us so there is not need to start up.
+			if( !started.get() ) {
+				return;
+			}
+			
+			replicationServer.getCluster().setMemberStatus(replicationServer.createStatus(State.SLAVE_SYNCRONIZING));
+			
+			transport = TransportFactory.connect(new URI(master));
+			transport.setTransportListener(this);
+			transport.start();
+	
+			// Make sure the replication directory exists.
+			replicationServer.getTempReplicationDir().mkdirs();
+			
+			ReplicationFrame frame = new ReplicationFrame();
+			frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
+			PBSlaveInit payload = new PBSlaveInit();
+			payload.setNodeId(replicationServer.getUri());
+			
+			// This call back is executed once the checkpoint is
+			// completed and all data has been
+			// synced to disk, but while a lock is still held on the
+			// store so that no
+			// updates are allowed.
+	
+			HashMap<String, PBFileInfo> infosMap = new HashMap<String, PBFileInfo>();
+			
+			// Add all the files that were being transfered..
+			File tempReplicationDir = replicationServer.getTempReplicationDir();
+			File[] list = tempReplicationDir.listFiles();
+			if( list!=null ) {
+				for (File file : list) {
+					String name = file.getName();
+					if( name.startsWith("database-") ) {
+						int snapshot;
+						try {
+							snapshot = Integer.parseInt(name.substring("database-".length()));
+						} catch (NumberFormatException e) {
+							continue;
+						}
+						
+						PBFileInfo info = ReplicationSupport.createInfo("database", file, 0, file.length());
+						info.setSnapshotId(snapshot);
+						infosMap.put("database", info);
+					} else if( name.startsWith("journal-") ) {
+						PBFileInfo info = ReplicationSupport.createInfo(name, file, 0, file.length());
+						infosMap.put(name, info);
+					}
+				}
+			}
+			
+			// Add all the db files that were not getting transfered..
+			KahaDBStore store = replicationServer.getStore();
+			Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
+			for (DataFile df : journalFiles.values()) {
+				String name = "journal-" + df.getDataFileId();
+				// Did we have a transfer in progress for that file already?
+				if( infosMap.containsKey(name) ) {
+					continue;
+				}
+				infosMap.put(name, ReplicationSupport.createInfo(name, df.getFile(), 0, df.getLength()));
+			}
+			if( !infosMap.containsKey("database") ) {
+				File pageFile = store.getPageFile().getFile();
+				if( pageFile.exists() ) {
+					infosMap.put("database", ReplicationSupport.createInfo("database", pageFile, 0, pageFile.length()));
+				}
+			}
+			
+			ArrayList<PBFileInfo> infos = new ArrayList<PBFileInfo>(infosMap.size());
+			for (PBFileInfo info : infosMap.values()) {
+				infos.add(info);
+			}
+			payload.setCurrentFilesList(infos);
+			
+			frame.setPayload(payload);
+			LOG.info("Sending master slave init command: " + payload);
+			online = false;
+			transport.oneway(frame);
+		}
+	}
+
+	private void doStop() throws Exception, IOException {
+		synchronized (transferMutex) {
+			if( this.transport!=null ) {
+				this.transport.stop();
+				this.transport=null;
+			}
+	
+			// Stop any current transfer sessions.
+			for (TransferSession session : this.transferSessions) {
+				session.stop();
+			}
+	
+			this.transferQueue.clear();
+			
+			this.initResponse=null;
+			this.bulkFiles.clear();	
+			this.online=false;
+	
+			if( journalUpateFile !=null ) {
+				journalUpateFile.close();
+				journalUpateFile=null;
+			}
+			journalUpdateFileId=0;
+		}
+	}
+
+	public void onClusterChange(ClusterState config) {
+		synchronized (transferMutex) {
+			try {
+	            if( master==null || !master.equals(config.getMaster()) ) {
+                    master = config.getMaster();
+		            doStop();
+				    doStart();
+	            }
+			} catch (Exception e) {
+				LOG.error("Could not restart syncing with new master: "+config.getMaster()+", due to: "+e,e);
+			}
+		}
+	}
+
+	public void onCommand(Object command) {
+		try {
+			ReplicationFrame frame = (ReplicationFrame) command;
+			switch (frame.getHeader().getType()) {
+			case SLAVE_INIT_RESPONSE:
+				onSlaveInitResponse(frame, (PBSlaveInitResponse) frame.getPayload());
+				break;
+			case JOURNAL_UPDATE:
+				onJournalUpdate(frame, (PBJournalUpdate) frame.getPayload());
+			}
+		} catch (Exception e) {
+			failed(e);
+		}
+	}
+
+	public void onException(IOException error) {
+		failed(error);
+	}
+
+	public void failed(Throwable error) {
+		try {
+			if( started.get() ) {
+				LOG.warn("Replication session fail to master: "+transport.getRemoteAddress(), error);
+				doStop();
+				// Wait a little an try to establish the session again..
+				Thread.sleep(1000);
+				doStart();
+			}
+		} catch (Exception ignore) {
+		}
+	}
+
+	public void transportInterupted() {
+	}
+	public void transportResumed() {
+	}
+	
+	private void onJournalUpdate(ReplicationFrame frame, PBJournalUpdate update) throws IOException {
+	    
+	    // Send an ack back once we get the ack.. yeah it's a little dirty to ack before it's on disk,
+	    // but chances are low that both machines are going to loose power at the same time and this way,
+	    // we reduce the latency the master sees from us.
+	    if( update.getSendAck() ) {
+	        ReplicationFrame ack = new ReplicationFrame();
+	        ack.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE_ACK));
+	        ack.setPayload(update.getLocation());
+	        transport.oneway(ack);
+	    }
+	    
+	    // TODO: actually do the disk write in an async thread so that this thread can be  
+	    // start reading in the next journal updated.
+	    
+		boolean onlineRecovery=false;
+		PBJournalLocation location = update.getLocation();
+		byte[] data = update.getData().toByteArray();
+		synchronized (transferMutex) {
+			if( journalUpateFile==null || journalUpdateFileId!=location.getFileId() ) {
+				if( journalUpateFile!=null) {
+					journalUpateFile.close();
+				}
+				File file;
+				String name = "journal-"+location.getFileId();
+				if( !online ) {
+					file = replicationServer.getTempReplicationFile(name, 0);
+					if( !bulkFiles.containsKey(name) ) {
+						bulkFiles.put(name, new PBFileInfo().setName(name));
+					}
+				} else {
+					// Once the data has been synced.. we are going to 
+					// go into an online recovery mode...
+					file = replicationServer.getReplicationFile(name);
+				}
+				journalUpateFile = new RandomAccessFile(file, "rw");
+				journalUpdateFileId = location.getFileId();
+			}
+			
+//			System.out.println("Writing: "+location.getFileId()+":"+location.getOffset()+" with "+data.length);
+			journalUpateFile.seek(location.getOffset());
+			journalUpateFile.write(data);
+			if( online ) {
+				onlineRecovery=true;
+			}
+		}
+		
+		if( onlineRecovery ) {
+			KahaDBStore store = replicationServer.getStore();
+			// Let the journal know that we appended to one of it's files..
+			store.getJournal().appendedExternally(ReplicationSupport.convert(location), data.length);
+			// Now incrementally recover those records.
+			store.incrementalRecover();
+		}
+	}
+	
+	
+	private void commitBulkTransfer() {
+		try {
+			
+			synchronized (transferMutex) {
+				
+				LOG.info("Slave synhcronization complete, going online...");
+				replicationServer.getStore().close();
+				
+				if( journalUpateFile!=null ) {
+					journalUpateFile.close();
+					journalUpateFile=null;
+				}
+				
+				// If we got a new snapshot of the database, then we need to 
+				// delete it's assisting files too.
+				if( bulkFiles.containsKey("database") ) {
+					PageFile pageFile = replicationServer.getStore().getPageFile();
+					pageFile.getRecoveryFile().delete();
+					pageFile.getFreeFile().delete();
+				}
+				
+				for (PBFileInfo info : bulkFiles.values()) {
+					File from = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+					File to = replicationServer.getReplicationFile(info.getName());
+					to.getParentFile().mkdirs();
+					move(from, to);
+				}
+				
+				delete(initResponse.getDeleteFilesList());
+				online=true;
+				
+				replicationServer.getStore().open();
+				
+	            replicationServer.getCluster().setMemberStatus(replicationServer.createStatus(State.SLAVE_ONLINE));
+				LOG.info("Slave is now online.  We are now eligible to become the master.");
+			}
+			
+			
+			
+			// Let the master know we are now online.
+			ReplicationFrame frame = new ReplicationFrame();
+			frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE));
+			transport.oneway(frame);
+			
+		} catch (Throwable e) {
+			e.printStackTrace();
+			failed(e);
+		}
+	}
+
+	private void onSlaveInitResponse(ReplicationFrame frame, PBSlaveInitResponse response) throws Exception {
+		LOG.info("Got init response: " + response);
+		initResponse = response;
+		
+		synchronized (transferMutex) {
+			bulkFiles.clear();
+			
+			List<PBFileInfo> infos = response.getCopyFilesList();
+			for (PBFileInfo info : infos) {
+				
+				bulkFiles.put(info.getName(), info);
+				File target = replicationServer.getReplicationFile(info.getName());
+				// are we just appending to an existing file journal file?
+				if( info.getName().startsWith("journal-") && info.getStart() > 0 && target.exists() ) {
+					// Then copy across the first bits..
+					File tempFile = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+					
+					FileInputStream is = new FileInputStream(target);
+					FileOutputStream os = new FileOutputStream(tempFile);
+					try {
+						copy(is, os, info.getStart());
+					} finally {
+						try { is.close(); } catch (Throwable e){}
+						try { os.close(); } catch (Throwable e){}
+					}
+				}
+			}
+			
+			
+			transferQueue.clear();
+			transferQueue.addAll(infos);
+		}
+		addTransferSession();
+	}
+
+	private PBFileInfo dequeueTransferQueue() throws Exception {
+		synchronized (transferMutex) {
+			if (transferQueue.isEmpty()) {
+				return null;
+			}
+			return transferQueue.removeFirst();
+		}
+	}
+
+	private void addTransferSession() {
+		synchronized (transferMutex) {
+			while (transport!=null && !transferQueue.isEmpty() && transferSessions.size() < MAX_TRANSFER_SESSIONS) {
+				TransferSession transferSession = new TransferSession();
+				transferSessions.add(transferSession);
+				try {
+					transferSession.start();
+				} catch (Exception e) {
+					transferSessions.remove(transferSession);
+				}
+			}
+			// Once we are done processing all the transfers..
+			if (transferQueue.isEmpty() && transferSessions.isEmpty()) {
+				commitBulkTransfer();
+			}
+		}
+	}
+
+	private void move(File from, File to) throws IOException {
+		
+		// If a simple rename/mv does not work..
+		to.delete();
+		if (!from.renameTo(to)) {
+			
+			// Copy and Delete.
+			FileInputStream is = null;
+			FileOutputStream os = null;
+			try {
+				is = new FileInputStream(from);
+				os = new FileOutputStream(to);
+
+				os.getChannel().transferFrom(is.getChannel(), 0, is.getChannel().size());
+			} finally {
+				try {
+					is.close();
+				} catch(Throwable e) {
+				}
+				try {
+					os.close();
+				} catch(Throwable e) {
+				}
+			}
+			from.delete();
+		}
+	}
+
+	class TransferSession implements Service, TransportListener {
+
+		Transport transport;
+		private PBFileInfo info;
+		private File toFile;
+		private AtomicBoolean stopped = new AtomicBoolean();
+		private long transferStart;
+
+		public void start() throws Exception {
+			LOG.info("File transfer session started.");
+			transport = TransportFactory.connect(new URI(replicationServer.getClusterState().getMaster()));
+			transport.setTransportListener(this);
+			transport.start();
+			sendNextRequestOrStop();
+		}
+
+		private void sendNextRequestOrStop() {
+			try {
+				PBFileInfo info = dequeueTransferQueue();
+				if (info != null) {
+
+					toFile = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+					this.info = info;
+
+					ReplicationFrame frame = new ReplicationFrame();
+					frame.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER));
+					frame.setPayload(info);
+
+					LOG.info("Requesting file: " + info.getName());
+					transferStart = System.currentTimeMillis();
+
+					transport.oneway(frame);
+				} else {
+					stop();
+				}
+
+			} catch (Exception e) {
+				failed(e);
+			}
+		}
+
+		public void stop() throws Exception {
+			if (stopped.compareAndSet(false, true)) {
+				LOG.info("File transfer session stopped.");
+				synchronized (transferMutex) {
+					if (info != null) {
+						transferQueue.addLast(info);
+					}
+					info = null;
+				}
+				transport.stop();
+				synchronized (transferMutex) {
+					transferSessions.remove(TransferSession.this);
+					addTransferSession();
+				}
+			}
+		}
+
+		public void onCommand(Object command) {
+			try {
+				ReplicationFrame frame = (ReplicationFrame) command;
+				InputStream is = (InputStream) frame.getPayload();
+				toFile.getParentFile().mkdirs();
+				
+				RandomAccessFile os = new RandomAccessFile(toFile, "rw");
+				os.seek(info.getStart());
+				try {
+					copy(is, os, frame.getHeader().getPayloadSize());
+					long transferTime = System.currentTimeMillis() - this.transferStart;
+					float rate = frame.getHeader().getPayloadSize() * transferTime / 1024000f;
+					LOG.info("File " + info.getName() + " transfered in " + transferTime + " (ms) at " + rate + " Kb/Sec");
+				} finally {
+					os.close();
+				}
+				this.info = null;
+				this.toFile = null;
+
+				sendNextRequestOrStop();
+			} catch (Exception e) {
+				failed(e);
+			}
+		}
+
+		public void onException(IOException error) {
+			failed(error);
+		}
+
+		public void failed(Exception error) {
+			try {
+				if (!stopped.get()) {
+					LOG.warn("Replication session failure: " + transport.getRemoteAddress());
+				}
+				stop();
+			} catch (Exception ignore) {
+			}
+		}
+
+		public void transportInterupted() {
+		}
+
+		public void transportResumed() {
+		}
+
+	}
+
+	private void copy(InputStream is, OutputStream os, long length) throws IOException {
+		byte buffer[] = new byte[1024 * 4];
+		int c = 0;
+		long pos = 0;
+		while (pos < length && ((c = is.read(buffer, 0, (int) Math.min(buffer.length, length - pos))) >= 0)) {
+			os.write(buffer, 0, c);
+			pos += c;
+		}
+	}
+	
+	private void copy(InputStream is, DataOutput os, long length) throws IOException {
+		byte buffer[] = new byte[1024 * 4];
+		int c = 0;
+		long pos = 0;
+		while (pos < length && ((c = is.read(buffer, 0, (int) Math.min(buffer.length, length - pos))) >= 0)) {
+			os.write(buffer, 0, c);
+			pos += c;
+		}
+	}
+	
+	private void delete(List<String> files) {
+		for (String fn : files) {
+			try {
+				replicationServer.getReplicationFile(fn).delete();
+			} catch (IOException e) {
+			}
+		}
+	}
+
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+
+public class ReplicationSupport {
+    
+    static public PBJournalLocation convert(Location loc) {
+        if( loc==null ) {
+            return null;
+        }
+        return new PBJournalLocation().setFileId(loc.getDataFileId()).setOffset(loc.getOffset());
+    }
+    
+    static public Location convert(PBJournalLocation location) {
+        Location rc = new Location();
+        rc.setDataFileId(location.getFileId());
+        rc.setOffset(location.getOffset());
+        return rc;
+    }
+
+
+    static public long copyAndChecksum(File input, File output) throws IOException {
+        FileInputStream is = null;
+        FileOutputStream os = null;
+        try {
+            is = new FileInputStream(input);
+            os = new FileOutputStream(output);
+
+            byte buffer[] = new byte[1024 * 4];
+            int c;
+
+            Checksum checksum = new Adler32();
+            while ((c = is.read(buffer)) >= 0) {
+                os.write(buffer, 0, c);
+                checksum.update(buffer, 0, c);
+            }
+            return checksum.getValue();
+
+        } finally {
+            try {
+                is.close();
+            } catch(Throwable e) {
+            }
+            try {
+                os.close();
+            } catch(Throwable e) {
+            }
+        }
+    }
+
+    public static PBFileInfo createInfo(String name, File file, long start, long length) throws IOException {
+        PBFileInfo rc = new PBFileInfo();
+        rc.setName(name);
+        rc.setChecksum(checksum(file, start, length));
+        rc.setStart(start);
+        rc.setEnd(length);
+        return rc;
+    }
+
+    public static long checksum(File file, long start, long end) throws IOException {
+        RandomAccessFile raf = new RandomAccessFile(file, "r");
+        try {
+            Checksum checksum = new Adler32();
+            byte buffer[] = new byte[1024 * 4];
+            int c;
+            long pos = start;
+            raf.seek(start);
+
+            while (pos < end && (c = raf.read(buffer, 0, (int) Math.min(end - pos, buffer.length))) >= 0) {
+                checksum.update(buffer, 0, c);
+                pos += c;
+            }
+
+            return checksum.getValue();
+        } finally {
+            try {
+                raf.close();
+            } catch (Throwable e) {
+            }
+        }
+    }
+
+}