You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/11 20:40:02 UTC
svn commit: r889781 [1/3] - in
/activemq/sandbox/activemq-apollo/activemq-kahadb-replication: ./ src/
src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/kahadb/
src/main/java/org/apache/kahadb/replication/ sr...
Author: chirino
Date: Fri Dec 11 19:39:58 2009
New Revision: 889781
URL: http://svn.apache.org/viewvc?rev=889781&view=rev
Log:
resurecting the kaahdb-replication idea.. goign to try to work on in it for the next couple of weeks. Fingers crossed I can get it working.
Added:
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/README
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/pom.xml (with props)
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterListener.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterState.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationService.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/blaze/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/zk/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/proto/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/proto/kahadb-replication.proto
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha-broker.xml
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha.xml
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/transport/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/wireformat/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/eclipse-resources/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/eclipse-resources/log4j.properties
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/blaze/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/transport/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/zk/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/ha-broker.xml
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/ha.xml
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/ha-broker.xml
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/ha.xml
activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/log4j.properties
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/README
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/README?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/README (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/README Fri Dec 11 19:39:58 2009
@@ -0,0 +1,18 @@
+=======================================================================
+ ActiveMQ KahaDB Replication: HA replication for the KahaDB store
+=======================================================================
+
+Goals:
+ * Provide fast replication of the KahaDB Store
+ * Support multiple replication slaves
+ * Support dynamically adding new replication slaves to a running master
+ * Support multiple master/slave selection strategies
+
+Status:
+ * ZooKeeper based master/slave selection strategies implemented
+ TODO: May need to have master watch for ZooKeeper disconnect
+ to force the master to go offline.
+ * A pure Broadcast/Multicast master/slave selection strategy
+ implemented using ActiveBlaze.
+ TODO: validate that split brain cannot occur.
+
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/pom.xml?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/pom.xml (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/pom.xml Fri Dec 11 19:39:58 2009
@@ -0,0 +1,188 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <parent>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-parent</artifactId>
+ <version>5.3.0</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-kahadb-replication</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>1.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>avalon-framework</groupId>
+ <artifactId>avalon-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>logkit</groupId>
+ <artifactId>logkit</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.14</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq.protobuf</groupId>
+ <artifactId>activemq-protobuf</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-spring</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-core</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-beans</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activeblaze</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <optional>true</optional>
+ </dependency>
+
+ </dependencies>
+
+ <repositories>
+ <repository>
+ <id>chirino-zk-repo</id>
+ <name>Private ZooKeeper Repo</name>
+ <url>http://people.apache.org/~chirino/zk-repo/</url>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>maven-xbean-plugin</artifactId>
+ <version>3.4</version>
+ <executions>
+ <execution>
+ <configuration>
+ <namespace>http://activemq.apache.org/schema/kahadb</namespace>
+ </configuration>
+ <goals>
+ <goal>mapping</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.5</source>
+ <target>1.5</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.activemq.protobuf</groupId>
+ <artifactId>activemq-protobuf</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <childDelegation>false</childDelegation>
+ <useFile>true</useFile>
+ <argLine>-Xmx512M</argLine>
+
+ <systemProperties>
+ <property>
+ <name>org.apache.activemq.default.directory.prefix</name>
+ <value>target/</value>
+ </property>
+ </systemProperties>
+
+ <includes>
+ <include>**/*Test.*</include>
+ </includes>
+ <excludes>
+ <exclude>**/perf/*</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Propchange: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/pom.xml
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterListener.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterListener.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterListener.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+public interface ClusterListener {
+
+ public void onClusterChange(ClusterState config);
+
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterState.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterState.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterState.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ClusterState {
+
+ private List<String> slaves = new ArrayList<String>();
+ private String master;
+
+ public List<String> getSlaves() {
+ return slaves;
+ }
+ public void setSlaves(List<String> slaves) {
+ this.slaves = slaves;
+ }
+ public String getMaster() {
+ return master;
+ }
+ public void setMaster(String master) {
+ this.master = master;
+ }
+
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ClusterStateManager.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import org.apache.activemq.Service;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+
+/**
+ * This interface is used by the ReplicationService to know when
+ * it should switch between Slave and Master mode.
+ *
+ * @author chirino
+ */
+public interface ClusterStateManager extends Service {
+
+ /**
+ * Adds a ClusterListener which is used to get notifications
+ * of chagnes in the cluster state.
+ * @param listener
+ */
+ void addListener(ClusterListener listener);
+
+ /**
+ * Removes a previously added ClusterListener
+ * @param listener
+ */
+ void removeListener(ClusterListener listener);
+
+ /**
+ * Adds a member to the cluster. Adding a member does not mean he is online.
+ * Some ClusterStateManager may keep track of a persistent memebership list
+ * so that can determine if there are enough nodes online to form a quorum
+ * for the purposes of electing a master.
+ *
+ * @param node
+ */
+ public void addMember(final String node);
+
+ /**
+ * Removes a previously added member.
+ *
+ * @param node
+ */
+ public void removeMember(final String node);
+
+ /**
+ * Updates the status of the local node.
+ *
+ * @param status
+ */
+ public void setMemberStatus(final PBClusterNodeStatus status);
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.BrokerService;
+
+/**
+ * This broker service actually does not do anything. It allows you to create an activemq.xml file
+ * which does not actually start a broker. Used in conjunction with the ReplicationService since
+ * he will create the actual BrokerService
+ *
+ * @author chirino
+ * @org.apache.xbean.XBean element="kahadbReplicationBroker"
+ */
+public class ReplicationBrokerService extends BrokerService {
+
+ ReplicationService replicationService;
+ AtomicBoolean started = new AtomicBoolean();
+
+ public ReplicationService getReplicationService() {
+ return replicationService;
+ }
+
+ public void setReplicationService(ReplicationService replicationService) {
+ this.replicationService = replicationService;
+ }
+
+ @Override
+ public void start() throws Exception {
+ if( started.compareAndSet(false, true) ) {
+ replicationService.start();
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if( started.compareAndSet(true, false) ) {
+ replicationService.stop();
+ }
+ }
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationFrame.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import org.apache.kahadb.replication.pb.PBHeader;
+
+public class ReplicationFrame {
+
+ PBHeader header;
+ Object payload;
+
+ public PBHeader getHeader() {
+ return header;
+ }
+ public void setHeader(PBHeader header) {
+ this.header = header;
+ }
+
+ public Object getPayload() {
+ return payload;
+ }
+ public void setPayload(Object payload) {
+ this.payload = payload;
+ }
+
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.util.Callback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.DataFile;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.journal.ReplicationTarget;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
+import org.apache.kahadb.replication.pb.PBType;
+import org.apache.kahadb.util.ByteSequence;
+
+
+public class ReplicationMaster implements Service, ClusterListener, ReplicationTarget {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationService.class);
+
+ private final ReplicationService replicationService;
+
+ private Object serverMutex = new Object() {
+ };
+ private TransportServer server;
+
+ private ArrayList<ReplicationSession> sessions = new ArrayList<ReplicationSession>();
+
+ private final AtomicInteger nextSnapshotId = new AtomicInteger();
+ private final Object requestMutex = new Object(){};
+ private Location requestLocation;
+ private CountDownLatch requestLatch;
+ private int minimumReplicas;
+
+ public ReplicationMaster(ReplicationService replicationService) {
+ this.replicationService = replicationService;
+ minimumReplicas = replicationService.getMinimumReplicas();
+ }
+
+ public void start() throws Exception {
+ synchronized (serverMutex) {
+ server = TransportFactory.bind(new URI(replicationService.getUri()));
+ server.setAcceptListener(new TransportAcceptListener() {
+ public void onAccept(Transport transport) {
+ try {
+ synchronized (serverMutex) {
+ ReplicationSession session = new ReplicationSession(transport);
+ session.start();
+ addSession(session);
+ }
+ } catch (Exception e) {
+ LOG.info("Could not accept replication connection from slave at " + transport.getRemoteAddress() + ", due to: " + e, e);
+ }
+ }
+
+ public void onAcceptError(Exception e) {
+ LOG.info("Could not accept replication connection: " + e, e);
+ }
+ });
+ server.start();
+ }
+ replicationService.getStore().getJournal().setReplicationTarget(this);
+ }
+
+ boolean isStarted() {
+ synchronized (serverMutex) {
+ return server != null;
+ }
+ }
+
+ public void stop() throws Exception {
+ replicationService.getStore().getJournal().setReplicationTarget(null);
+ synchronized (serverMutex) {
+ if (server != null) {
+ server.stop();
+ server = null;
+ }
+ }
+
+ ArrayList<ReplicationSession> sessionsSnapshot;
+ synchronized (this.sessions) {
+ sessionsSnapshot = this.sessions;
+ }
+
+ for (ReplicationSession session : sessionsSnapshot) {
+ session.stop();
+ }
+ }
+
+ protected void addSession(ReplicationSession session) {
+ synchronized (sessions) {
+ sessions = new ArrayList<ReplicationSession>(sessions);
+ sessions.add(session);
+ }
+ }
+
+ protected void removeSession(ReplicationSession session) {
+ synchronized (sessions) {
+ sessions = new ArrayList<ReplicationSession>(sessions);
+ sessions.remove(session);
+ }
+ }
+
+ public void onClusterChange(ClusterState config) {
+ // For now, we don't really care about changes in the slave config..
+ }
+
+ /**
+ * This is called by the Journal so that we can replicate the update to the
+ * slaves.
+ */
+ public void replicate(Location location, ByteSequence sequence, boolean sync) {
+ ArrayList<ReplicationSession> sessionsSnapshot;
+ synchronized (this.sessions) {
+ // Hurrah for copy on write..
+ sessionsSnapshot = this.sessions;
+ }
+
+ // We may be able to always async replicate...
+ if (minimumReplicas==0) {
+ sync = false;
+ }
+ CountDownLatch latch = null;
+ if (sync) {
+ latch = new CountDownLatch(minimumReplicas);
+ synchronized (requestMutex) {
+ requestLatch = latch;
+ requestLocation = location;
+ }
+ }
+
+ ReplicationFrame frame = null;
+ for (ReplicationSession session : sessionsSnapshot) {
+ if (session.subscribedToJournalUpdates.get()) {
+
+ // Lazy create the frame since we may have not avilable sessions
+ // to send to.
+ if (frame == null) {
+ frame = new ReplicationFrame();
+ frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
+ PBJournalUpdate payload = new PBJournalUpdate();
+ payload.setLocation(ReplicationSupport.convert(location));
+ payload.setData(new org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), sequence.getLength()));
+ payload.setSendAck(sync);
+ frame.setPayload(payload);
+ }
+
+ // TODO: use async send threads so that the frames can be pushed
+ // out in parallel.
+ try {
+ session.setLastUpdateLocation(location);
+ session.transport.oneway(frame);
+ } catch (IOException e) {
+ session.onException(e);
+ }
+ }
+ }
+
+ if (sync) {
+ try {
+ int timeout = 500;
+ int counter = 0;
+ while (true) {
+ if (latch.await(timeout, TimeUnit.MILLISECONDS)) {
+ return;
+ }
+ if (!isStarted()) {
+ return;
+ }
+ counter++;
+ if ((counter % 10) == 0) {
+ LOG.warn("KahaDB is waiting for slave to come online. " + (timeout * counter / 1000.f) + " seconds have elapsed.");
+ }
+ }
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ }
+
+ private void ackAllFromTo(Location lastAck, Location newAck) {
+ Location l;
+ java.util.concurrent.CountDownLatch latch;
+ synchronized (requestMutex) {
+ latch = requestLatch;
+ l = requestLocation;
+ }
+ if( l == null ) {
+ return;
+ }
+
+ if (lastAck == null || lastAck.compareTo(l) < 0) {
+ if (newAck != null && l.compareTo(newAck) <= 0) {
+ latch.countDown();
+ return;
+ }
+ }
+ }
+
+ class ReplicationSession implements Service, TransportListener {
+
+ private final Transport transport;
+ private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean();
+ private boolean stopped;
+
+ private File snapshotFile;
+ private HashSet<Integer> journalReplicatedFiles;
+ private Location lastAckLocation;
+ private Location lastUpdateLocation;
+ private boolean online;
+
+ public ReplicationSession(Transport transport) {
+ this.transport = transport;
+ }
+
+ synchronized public void setLastUpdateLocation(Location lastUpdateLocation) {
+ this.lastUpdateLocation = lastUpdateLocation;
+ }
+
+ public void start() throws Exception {
+ transport.setTransportListener(this);
+ transport.start();
+ }
+
+ synchronized public void stop() throws Exception {
+ if (!stopped) {
+ stopped = true;
+ deleteReplicationData();
+ transport.stop();
+ }
+ }
+
+ synchronized private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation location) {
+ Location ack = ReplicationSupport.convert(location);
+ if (online) {
+ ackAllFromTo(lastAckLocation, ack);
+ }
+ lastAckLocation = ack;
+ }
+
+ synchronized private void onSlaveOnline(ReplicationFrame frame) {
+ deleteReplicationData();
+ online = true;
+ if (lastAckLocation != null) {
+ ackAllFromTo(null, lastAckLocation);
+ }
+
+ }
+
+ public void onCommand(Object command) {
+ try {
+ ReplicationFrame frame = (ReplicationFrame)command;
+ switch (frame.getHeader().getType()) {
+ case SLAVE_INIT:
+ onSlaveInit(frame, (PBSlaveInit)frame.getPayload());
+ break;
+ case SLAVE_ONLINE:
+ onSlaveOnline(frame);
+ break;
+ case FILE_TRANSFER:
+ onFileTransfer(frame, (PBFileInfo)frame.getPayload());
+ break;
+ case JOURNAL_UPDATE_ACK:
+ onJournalUpdateAck(frame, (PBJournalLocation)frame.getPayload());
+ break;
+ }
+ } catch (Exception e) {
+ LOG.warn("Slave request failed: " + e, e);
+ failed(e);
+ }
+ }
+
+ public void onException(IOException error) {
+ failed(error);
+ }
+
+ public void failed(Exception error) {
+ try {
+ stop();
+ } catch (Exception ignore) {
+ }
+ }
+
+ public void transportInterupted() {
+ }
+
+ public void transportResumed() {
+ }
+
+ private void deleteReplicationData() {
+ if (snapshotFile != null) {
+ snapshotFile.delete();
+ snapshotFile = null;
+ }
+ if (journalReplicatedFiles != null) {
+ journalReplicatedFiles = null;
+ updateJournalReplicatedFiles();
+ }
+ }
+
+ private void onSlaveInit(ReplicationFrame frame, PBSlaveInit slaveInit) throws Exception {
+
+ // Start sending journal updates to the slave.
+ subscribedToJournalUpdates.set(true);
+
+ // We could look at the slave state sent in the slaveInit and decide
+ // that a full sync is not needed..
+ // but for now we will do a full sync every time.
+ ReplicationFrame rc = new ReplicationFrame();
+ final PBSlaveInitResponse rcPayload = new PBSlaveInitResponse();
+ rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
+ rc.setPayload(rcPayload);
+
+ // Setup a map of all the files that the slave has
+ final HashMap<String, PBFileInfo> slaveFiles = new HashMap<String, PBFileInfo>();
+ for (PBFileInfo info : slaveInit.getCurrentFilesList()) {
+ slaveFiles.put(info.getName(), info);
+ }
+
+ final KahaDBStore store = replicationService.getStore();
+ store.checkpoint(new Callback() {
+ public void execute() throws Exception {
+ // This call back is executed once the checkpoint is
+ // completed and all data has been synced to disk,
+ // but while a lock is still held on the store so
+ // that no updates are done while we are in this
+ // method.
+
+ KahaDBStore store = replicationService.getStore();
+ if (lastAckLocation == null) {
+ lastAckLocation = store.getLastUpdatePosition();
+ }
+
+ int snapshotId = nextSnapshotId.incrementAndGet();
+ File file = store.getPageFile().getFile();
+ File dir = replicationService.getTempReplicationDir();
+ dir.mkdirs();
+ snapshotFile = new File(dir, "snapshot-" + snapshotId);
+
+ journalReplicatedFiles = new HashSet<Integer>();
+
+ // Store the list files associated with the snapshot.
+ ArrayList<PBFileInfo> snapshotInfos = new ArrayList<PBFileInfo>();
+ Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
+ for (DataFile df : journalFiles.values()) {
+ // Look at what the slave has so that only the missing
+ // bits are transfered.
+ String name = "journal-" + df.getDataFileId();
+ PBFileInfo slaveInfo = slaveFiles.remove(name);
+
+ // Use the checksum info to see if the slave has the
+ // file already.. Checksums are less acurrate for
+ // small amounts of data.. so ignore small files.
+ if (slaveInfo != null && slaveInfo.getEnd() > 1024 * 512) {
+ // If the slave's file checksum matches what we
+ // have..
+ if (ReplicationSupport.checksum(df.getFile(), 0, slaveInfo.getEnd()) == slaveInfo.getChecksum()) {
+ // is Our file longer? then we need to continue
+ // transferring the rest of the file.
+ if (df.getLength() > slaveInfo.getEnd()) {
+ snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), slaveInfo.getEnd(), df.getLength()));
+ journalReplicatedFiles.add(df.getDataFileId());
+ continue;
+ } else {
+ // No need to replicate this file.
+ continue;
+ }
+ }
+ }
+
+ // If we got here then it means we need to transfer the
+ // whole file.
+ snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), 0, df.getLength()));
+ journalReplicatedFiles.add(df.getDataFileId());
+ }
+
+ PBFileInfo info = new PBFileInfo();
+ info.setName("database");
+ info.setSnapshotId(snapshotId);
+ info.setStart(0);
+ info.setEnd(file.length());
+ info.setChecksum(ReplicationSupport.copyAndChecksum(file, snapshotFile));
+ snapshotInfos.add(info);
+
+ rcPayload.setCopyFilesList(snapshotInfos);
+ ArrayList<String> deleteFiles = new ArrayList<String>();
+ slaveFiles.remove("database");
+ for (PBFileInfo unused : slaveFiles.values()) {
+ deleteFiles.add(unused.getName());
+ }
+ rcPayload.setDeleteFilesList(deleteFiles);
+
+ updateJournalReplicatedFiles();
+ }
+
+ });
+
+ transport.oneway(rc);
+ }
+
+ private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException {
+ File file = replicationService.getReplicationFile(fileInfo.getName());
+ long payloadSize = fileInfo.getEnd() - fileInfo.getStart();
+
+ if (file.length() < fileInfo.getStart() + payloadSize) {
+ throw new IOException("Requested replication file dose not have enough data.");
+ }
+
+ ReplicationFrame rc = new ReplicationFrame();
+ rc.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(payloadSize));
+
+ FileInputStream is = new FileInputStream(file);
+ rc.setPayload(is);
+ try {
+ is.skip(fileInfo.getStart());
+ transport.oneway(rc);
+ } finally {
+ try {
+ is.close();
+ } catch (Throwable e) {
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Looks at all the journal files being currently replicated and informs the
+ * KahaDB so that it does not delete them while the replication is occuring.
+ */
+ private void updateJournalReplicatedFiles() {
+ HashSet<Integer> files = replicationService.getStore().getJournalFilesBeingReplicated();
+ files.clear();
+
+ ArrayList<ReplicationSession> sessionsSnapshot;
+ synchronized (this.sessions) {
+ // Hurrah for copy on write..
+ sessionsSnapshot = this.sessions;
+ }
+
+ for (ReplicationSession session : sessionsSnapshot) {
+ if (session.journalReplicatedFiles != null) {
+ files.addAll(session.journalReplicatedFiles);
+ }
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationService.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationService.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationService.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.IOHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
+
+/**
+ * Handles interfacing with the ClusterStateManager and handles activating the
+ * slave or master facets of the broker.
+ *
+ * @author chirino
+ * @org.apache.xbean.XBean element="kahadbReplication"
+ */
+public class ReplicationService implements Service, ClusterListener {
+
+ private static final String JOURNAL_PREFIX = "journal-";
+
+ private static final Log LOG = LogFactory.getLog(ReplicationService.class);
+
+ private String brokerURI = "xbean:broker.xml";
+ private File directory = new File(IOHelper.getDefaultDataDirectory());
+ private File tempReplicationDir;
+ private String uri;
+ private ClusterStateManager cluster;
+ private int minimumReplicas=1;
+
+ private KahaDBStore store;
+
+ private ClusterState clusterState;
+ private BrokerService brokerService;
+ private ReplicationMaster master;
+ private ReplicationSlave slave;
+
+ public void start() throws Exception {
+ if( cluster==null ) {
+ throw new IllegalArgumentException("The cluster field has not been set.");
+ }
+ // The cluster will let us know about the cluster configuration,
+ // which lets us decide if we are going to be a slave or a master.
+ getStore().open();
+ cluster.addListener(this);
+ cluster.start();
+
+ cluster.addMember(getUri());
+ cluster.setMemberStatus(createStatus(State.SLAVE_UNCONNECTED));
+ }
+
+ public PBClusterNodeStatus createStatus(State state) throws IOException {
+ final PBClusterNodeStatus status = new PBClusterNodeStatus();
+ status.setConnectUri(getUri());
+ status.setLastUpdate(ReplicationSupport.convert(getStore().getLastUpdatePosition()));
+ status.setState(state);
+ return status;
+ }
+
+ public void stop() throws Exception {
+ cluster.removeListener(this);
+ cluster.stop();
+ stopMaster();
+ stopSlave();
+ getStore().close();
+ }
+
+ public void onClusterChange(ClusterState clusterState) {
+ this.clusterState = clusterState;
+ try {
+ synchronized (cluster) {
+ if (areWeTheSlave(clusterState)) {
+ // If we were the master we need to stop the master
+ // service..
+ stopMaster();
+ // If the slave service was not yet started.. start it up.
+ if( clusterState.getMaster()==null ) {
+ stopSlave();
+ } else {
+ startSlave();
+ slave.onClusterChange(clusterState);
+ }
+ } else if (areWeTheMaster(clusterState)) {
+ // If we were the slave we need to stop the slave service..
+ stopSlave();
+ // If the master service was not yet started.. start it up.
+ startMaster();
+ master.onClusterChange(clusterState);
+ } else {
+ // We were not part of the configuration (not master nor
+ // slave).
+ // So we have to shutdown any running master or slave
+ // services that may
+ // have been running.
+ stopMaster();
+ stopSlave();
+ getCluster().setMemberStatus(createStatus(State.SLAVE_UNCONNECTED));
+
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Unexpected Error: " + e, e);
+ }
+ }
+
+ private void startMaster() throws IOException, Exception {
+ if (master == null) {
+ LOG.info("Starting replication master.");
+ getCluster().setMemberStatus(createStatus(State.MASTER));
+ brokerService = createBrokerService();
+ brokerService.start();
+ master = new ReplicationMaster(this);
+ master.start();
+ }
+ }
+
+ private void stopSlave() throws Exception {
+ if (slave != null) {
+ LOG.info("Stopping replication slave.");
+ slave.stop();
+ slave = null;
+ }
+ }
+
+ private void startSlave() throws Exception {
+ if (slave == null) {
+ LOG.info("Starting replication slave.");
+ slave = new ReplicationSlave(this);
+ slave.start();
+ }
+ }
+
+ private void stopMaster() throws Exception, IOException {
+ if (master != null) {
+ LOG.info("Stopping replication master.");
+ master.stop();
+ master = null;
+ brokerService.stop();
+ brokerService = null;
+ // Stopping the broker service actually stops the store
+ // too..
+ // so we need to open it back up.
+ getStore().open();
+ }
+ }
+
+ public BrokerService getBrokerService() {
+ return brokerService;
+ }
+
+ private BrokerService createBrokerService() throws Exception {
+ BrokerService rc = BrokerFactory.createBroker(brokerURI);
+ rc.setPersistenceAdapter(getStore());
+ return rc;
+ }
+
+ public ClusterState getClusterState() {
+ return clusterState;
+ }
+
+ private boolean areWeTheSlave(ClusterState config) {
+ return config.getSlaves().contains(uri);
+ }
+
+ private boolean areWeTheMaster(ClusterState config) {
+ return uri.equals(config.getMaster());
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Accessors
+ ///////////////////////////////////////////////////////////////////
+
+ public File getReplicationFile(String fn) throws IOException {
+ if (fn.equals("database")) {
+ return getStore().getPageFile().getFile();
+ }
+ if (fn.startsWith(JOURNAL_PREFIX)) {
+ int id;
+ try {
+ id = Integer.parseInt(fn.substring(JOURNAL_PREFIX.length()));
+ } catch (NumberFormatException e) {
+ throw new IOException("Unknown replication file name: " + fn);
+ }
+ return getStore().getJournal().getFile(id);
+ } else {
+ throw new IOException("Unknown replication file name: " + fn);
+ }
+ }
+
+
+ public File getTempReplicationFile(String fn, int snapshotId) throws IOException {
+ if (fn.equals("database")) {
+ return new File(getTempReplicationDir(), "database-" + snapshotId);
+ }
+ if (fn.startsWith(JOURNAL_PREFIX)) {
+ int id;
+ try {
+ id = Integer.parseInt(fn.substring(JOURNAL_PREFIX.length()));
+ } catch (NumberFormatException e) {
+ throw new IOException("Unknown replication file name: " + fn);
+ }
+ return new File(getTempReplicationDir(), fn);
+ } else {
+ throw new IOException("Unknown replication file name: " + fn);
+ }
+ }
+
+ public boolean isMaster() {
+ return master != null;
+ }
+
+ public File getTempReplicationDir() {
+ if (tempReplicationDir == null) {
+ tempReplicationDir = new File(getStore().getDirectory(), "replication");
+ }
+ return tempReplicationDir;
+ }
+ public void setTempReplicationDir(File tempReplicationDir) {
+ this.tempReplicationDir = tempReplicationDir;
+ }
+
+ public KahaDBStore getStore() {
+ if (store == null) {
+ store = new KahaDBStore();
+ store.setDirectory(directory);
+ }
+ return store;
+ }
+ public void setStore(KahaDBStore store) {
+ this.store = store;
+ }
+
+ public File getDirectory() {
+ return directory;
+ }
+ public void setDirectory(File directory) {
+ this.directory = directory;
+ }
+
+ public String getBrokerURI() {
+ return brokerURI;
+ }
+ public void setBrokerURI(String brokerURI) {
+ this.brokerURI = brokerURI;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+ public void setUri(String nodeId) {
+ this.uri = nodeId;
+ }
+
+ public ClusterStateManager getCluster() {
+ return cluster;
+ }
+ public void setCluster(ClusterStateManager cluster) {
+ this.cluster = cluster;
+ }
+
+ public int getMinimumReplicas() {
+ return minimumReplicas;
+ }
+
+ public void setMinimumReplicas(int minimumReplicas) {
+ this.minimumReplicas = minimumReplicas;
+ }
+
+
+
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,588 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.DataFile;
+import org.apache.kahadb.page.PageFile;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
+import org.apache.kahadb.replication.pb.PBType;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+
+public class ReplicationSlave implements Service, ClusterListener, TransportListener {
+
+ private static final int MAX_TRANSFER_SESSIONS = 1;
+
+ private static final Log LOG = LogFactory.getLog(ReplicationSlave.class);
+
+ private final ReplicationService replicationServer;
+ private Transport transport;
+
+ // Used to bulk transfer the master state over to the slave..
+ private final Object transferMutex = new Object();
+ private final LinkedList<PBFileInfo> transferQueue = new LinkedList<PBFileInfo>();
+ private final LinkedList<TransferSession> transferSessions = new LinkedList<TransferSession>();
+ private final HashMap<String, PBFileInfo> bulkFiles = new HashMap<String, PBFileInfo>();
+ private PBSlaveInitResponse initResponse;
+ private boolean online;
+ private final AtomicBoolean started = new AtomicBoolean();
+
+ // Used to do real time journal updates..
+ int journalUpdateFileId;
+ RandomAccessFile journalUpateFile;
+ private String master;
+
+ public ReplicationSlave(ReplicationService replicationServer) {
+ this.replicationServer = replicationServer;
+ }
+
+ public void start() throws Exception {
+ if( started.compareAndSet(false, true)) {
+ onClusterChange(replicationServer.getClusterState());
+
+ }
+ }
+
+ public void stop() throws Exception {
+ if( started.compareAndSet(true, false)) {
+ doStop();
+ }
+ }
+
+ private void doStart() throws Exception, URISyntaxException, IOException {
+ synchronized (transferMutex) {
+
+ // Failure recovery might be trying to start us back up,
+ // but the Replication server may have already stopped us so there is not need to start up.
+ if( !started.get() ) {
+ return;
+ }
+
+ replicationServer.getCluster().setMemberStatus(replicationServer.createStatus(State.SLAVE_SYNCRONIZING));
+
+ transport = TransportFactory.connect(new URI(master));
+ transport.setTransportListener(this);
+ transport.start();
+
+ // Make sure the replication directory exists.
+ replicationServer.getTempReplicationDir().mkdirs();
+
+ ReplicationFrame frame = new ReplicationFrame();
+ frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
+ PBSlaveInit payload = new PBSlaveInit();
+ payload.setNodeId(replicationServer.getUri());
+
+ // This call back is executed once the checkpoint is
+ // completed and all data has been
+ // synced to disk, but while a lock is still held on the
+ // store so that no
+ // updates are allowed.
+
+ HashMap<String, PBFileInfo> infosMap = new HashMap<String, PBFileInfo>();
+
+ // Add all the files that were being transfered..
+ File tempReplicationDir = replicationServer.getTempReplicationDir();
+ File[] list = tempReplicationDir.listFiles();
+ if( list!=null ) {
+ for (File file : list) {
+ String name = file.getName();
+ if( name.startsWith("database-") ) {
+ int snapshot;
+ try {
+ snapshot = Integer.parseInt(name.substring("database-".length()));
+ } catch (NumberFormatException e) {
+ continue;
+ }
+
+ PBFileInfo info = ReplicationSupport.createInfo("database", file, 0, file.length());
+ info.setSnapshotId(snapshot);
+ infosMap.put("database", info);
+ } else if( name.startsWith("journal-") ) {
+ PBFileInfo info = ReplicationSupport.createInfo(name, file, 0, file.length());
+ infosMap.put(name, info);
+ }
+ }
+ }
+
+ // Add all the db files that were not getting transfered..
+ KahaDBStore store = replicationServer.getStore();
+ Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
+ for (DataFile df : journalFiles.values()) {
+ String name = "journal-" + df.getDataFileId();
+ // Did we have a transfer in progress for that file already?
+ if( infosMap.containsKey(name) ) {
+ continue;
+ }
+ infosMap.put(name, ReplicationSupport.createInfo(name, df.getFile(), 0, df.getLength()));
+ }
+ if( !infosMap.containsKey("database") ) {
+ File pageFile = store.getPageFile().getFile();
+ if( pageFile.exists() ) {
+ infosMap.put("database", ReplicationSupport.createInfo("database", pageFile, 0, pageFile.length()));
+ }
+ }
+
+ ArrayList<PBFileInfo> infos = new ArrayList<PBFileInfo>(infosMap.size());
+ for (PBFileInfo info : infosMap.values()) {
+ infos.add(info);
+ }
+ payload.setCurrentFilesList(infos);
+
+ frame.setPayload(payload);
+ LOG.info("Sending master slave init command: " + payload);
+ online = false;
+ transport.oneway(frame);
+ }
+ }
+
+ private void doStop() throws Exception, IOException {
+ synchronized (transferMutex) {
+ if( this.transport!=null ) {
+ this.transport.stop();
+ this.transport=null;
+ }
+
+ // Stop any current transfer sessions.
+ for (TransferSession session : this.transferSessions) {
+ session.stop();
+ }
+
+ this.transferQueue.clear();
+
+ this.initResponse=null;
+ this.bulkFiles.clear();
+ this.online=false;
+
+ if( journalUpateFile !=null ) {
+ journalUpateFile.close();
+ journalUpateFile=null;
+ }
+ journalUpdateFileId=0;
+ }
+ }
+
+ public void onClusterChange(ClusterState config) {
+ synchronized (transferMutex) {
+ try {
+ if( master==null || !master.equals(config.getMaster()) ) {
+ master = config.getMaster();
+ doStop();
+ doStart();
+ }
+ } catch (Exception e) {
+ LOG.error("Could not restart syncing with new master: "+config.getMaster()+", due to: "+e,e);
+ }
+ }
+ }
+
+ public void onCommand(Object command) {
+ try {
+ ReplicationFrame frame = (ReplicationFrame) command;
+ switch (frame.getHeader().getType()) {
+ case SLAVE_INIT_RESPONSE:
+ onSlaveInitResponse(frame, (PBSlaveInitResponse) frame.getPayload());
+ break;
+ case JOURNAL_UPDATE:
+ onJournalUpdate(frame, (PBJournalUpdate) frame.getPayload());
+ }
+ } catch (Exception e) {
+ failed(e);
+ }
+ }
+
+ public void onException(IOException error) {
+ failed(error);
+ }
+
+ public void failed(Throwable error) {
+ try {
+ if( started.get() ) {
+ LOG.warn("Replication session fail to master: "+transport.getRemoteAddress(), error);
+ doStop();
+ // Wait a little an try to establish the session again..
+ Thread.sleep(1000);
+ doStart();
+ }
+ } catch (Exception ignore) {
+ }
+ }
+
+ public void transportInterupted() {
+ }
+ public void transportResumed() {
+ }
+
+ private void onJournalUpdate(ReplicationFrame frame, PBJournalUpdate update) throws IOException {
+
+ // Send an ack back once we get the ack.. yeah it's a little dirty to ack before it's on disk,
+ // but chances are low that both machines are going to loose power at the same time and this way,
+ // we reduce the latency the master sees from us.
+ if( update.getSendAck() ) {
+ ReplicationFrame ack = new ReplicationFrame();
+ ack.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE_ACK));
+ ack.setPayload(update.getLocation());
+ transport.oneway(ack);
+ }
+
+ // TODO: actually do the disk write in an async thread so that this thread can be
+ // start reading in the next journal updated.
+
+ boolean onlineRecovery=false;
+ PBJournalLocation location = update.getLocation();
+ byte[] data = update.getData().toByteArray();
+ synchronized (transferMutex) {
+ if( journalUpateFile==null || journalUpdateFileId!=location.getFileId() ) {
+ if( journalUpateFile!=null) {
+ journalUpateFile.close();
+ }
+ File file;
+ String name = "journal-"+location.getFileId();
+ if( !online ) {
+ file = replicationServer.getTempReplicationFile(name, 0);
+ if( !bulkFiles.containsKey(name) ) {
+ bulkFiles.put(name, new PBFileInfo().setName(name));
+ }
+ } else {
+ // Once the data has been synced.. we are going to
+ // go into an online recovery mode...
+ file = replicationServer.getReplicationFile(name);
+ }
+ journalUpateFile = new RandomAccessFile(file, "rw");
+ journalUpdateFileId = location.getFileId();
+ }
+
+// System.out.println("Writing: "+location.getFileId()+":"+location.getOffset()+" with "+data.length);
+ journalUpateFile.seek(location.getOffset());
+ journalUpateFile.write(data);
+ if( online ) {
+ onlineRecovery=true;
+ }
+ }
+
+ if( onlineRecovery ) {
+ KahaDBStore store = replicationServer.getStore();
+ // Let the journal know that we appended to one of it's files..
+ store.getJournal().appendedExternally(ReplicationSupport.convert(location), data.length);
+ // Now incrementally recover those records.
+ store.incrementalRecover();
+ }
+ }
+
+
+ private void commitBulkTransfer() {
+ try {
+
+ synchronized (transferMutex) {
+
+ LOG.info("Slave synhcronization complete, going online...");
+ replicationServer.getStore().close();
+
+ if( journalUpateFile!=null ) {
+ journalUpateFile.close();
+ journalUpateFile=null;
+ }
+
+ // If we got a new snapshot of the database, then we need to
+ // delete it's assisting files too.
+ if( bulkFiles.containsKey("database") ) {
+ PageFile pageFile = replicationServer.getStore().getPageFile();
+ pageFile.getRecoveryFile().delete();
+ pageFile.getFreeFile().delete();
+ }
+
+ for (PBFileInfo info : bulkFiles.values()) {
+ File from = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+ File to = replicationServer.getReplicationFile(info.getName());
+ to.getParentFile().mkdirs();
+ move(from, to);
+ }
+
+ delete(initResponse.getDeleteFilesList());
+ online=true;
+
+ replicationServer.getStore().open();
+
+ replicationServer.getCluster().setMemberStatus(replicationServer.createStatus(State.SLAVE_ONLINE));
+ LOG.info("Slave is now online. We are now eligible to become the master.");
+ }
+
+
+
+ // Let the master know we are now online.
+ ReplicationFrame frame = new ReplicationFrame();
+ frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE));
+ transport.oneway(frame);
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ failed(e);
+ }
+ }
+
+ private void onSlaveInitResponse(ReplicationFrame frame, PBSlaveInitResponse response) throws Exception {
+ LOG.info("Got init response: " + response);
+ initResponse = response;
+
+ synchronized (transferMutex) {
+ bulkFiles.clear();
+
+ List<PBFileInfo> infos = response.getCopyFilesList();
+ for (PBFileInfo info : infos) {
+
+ bulkFiles.put(info.getName(), info);
+ File target = replicationServer.getReplicationFile(info.getName());
+ // are we just appending to an existing file journal file?
+ if( info.getName().startsWith("journal-") && info.getStart() > 0 && target.exists() ) {
+ // Then copy across the first bits..
+ File tempFile = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+
+ FileInputStream is = new FileInputStream(target);
+ FileOutputStream os = new FileOutputStream(tempFile);
+ try {
+ copy(is, os, info.getStart());
+ } finally {
+ try { is.close(); } catch (Throwable e){}
+ try { os.close(); } catch (Throwable e){}
+ }
+ }
+ }
+
+
+ transferQueue.clear();
+ transferQueue.addAll(infos);
+ }
+ addTransferSession();
+ }
+
+ private PBFileInfo dequeueTransferQueue() throws Exception {
+ synchronized (transferMutex) {
+ if (transferQueue.isEmpty()) {
+ return null;
+ }
+ return transferQueue.removeFirst();
+ }
+ }
+
+ private void addTransferSession() {
+ synchronized (transferMutex) {
+ while (transport!=null && !transferQueue.isEmpty() && transferSessions.size() < MAX_TRANSFER_SESSIONS) {
+ TransferSession transferSession = new TransferSession();
+ transferSessions.add(transferSession);
+ try {
+ transferSession.start();
+ } catch (Exception e) {
+ transferSessions.remove(transferSession);
+ }
+ }
+ // Once we are done processing all the transfers..
+ if (transferQueue.isEmpty() && transferSessions.isEmpty()) {
+ commitBulkTransfer();
+ }
+ }
+ }
+
+ private void move(File from, File to) throws IOException {
+
+ // If a simple rename/mv does not work..
+ to.delete();
+ if (!from.renameTo(to)) {
+
+ // Copy and Delete.
+ FileInputStream is = null;
+ FileOutputStream os = null;
+ try {
+ is = new FileInputStream(from);
+ os = new FileOutputStream(to);
+
+ os.getChannel().transferFrom(is.getChannel(), 0, is.getChannel().size());
+ } finally {
+ try {
+ is.close();
+ } catch(Throwable e) {
+ }
+ try {
+ os.close();
+ } catch(Throwable e) {
+ }
+ }
+ from.delete();
+ }
+ }
+
+ class TransferSession implements Service, TransportListener {
+
+ Transport transport;
+ private PBFileInfo info;
+ private File toFile;
+ private AtomicBoolean stopped = new AtomicBoolean();
+ private long transferStart;
+
+ public void start() throws Exception {
+ LOG.info("File transfer session started.");
+ transport = TransportFactory.connect(new URI(replicationServer.getClusterState().getMaster()));
+ transport.setTransportListener(this);
+ transport.start();
+ sendNextRequestOrStop();
+ }
+
+ private void sendNextRequestOrStop() {
+ try {
+ PBFileInfo info = dequeueTransferQueue();
+ if (info != null) {
+
+ toFile = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+ this.info = info;
+
+ ReplicationFrame frame = new ReplicationFrame();
+ frame.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER));
+ frame.setPayload(info);
+
+ LOG.info("Requesting file: " + info.getName());
+ transferStart = System.currentTimeMillis();
+
+ transport.oneway(frame);
+ } else {
+ stop();
+ }
+
+ } catch (Exception e) {
+ failed(e);
+ }
+ }
+
+ public void stop() throws Exception {
+ if (stopped.compareAndSet(false, true)) {
+ LOG.info("File transfer session stopped.");
+ synchronized (transferMutex) {
+ if (info != null) {
+ transferQueue.addLast(info);
+ }
+ info = null;
+ }
+ transport.stop();
+ synchronized (transferMutex) {
+ transferSessions.remove(TransferSession.this);
+ addTransferSession();
+ }
+ }
+ }
+
+ public void onCommand(Object command) {
+ try {
+ ReplicationFrame frame = (ReplicationFrame) command;
+ InputStream is = (InputStream) frame.getPayload();
+ toFile.getParentFile().mkdirs();
+
+ RandomAccessFile os = new RandomAccessFile(toFile, "rw");
+ os.seek(info.getStart());
+ try {
+ copy(is, os, frame.getHeader().getPayloadSize());
+ long transferTime = System.currentTimeMillis() - this.transferStart;
+ float rate = frame.getHeader().getPayloadSize() * transferTime / 1024000f;
+ LOG.info("File " + info.getName() + " transfered in " + transferTime + " (ms) at " + rate + " Kb/Sec");
+ } finally {
+ os.close();
+ }
+ this.info = null;
+ this.toFile = null;
+
+ sendNextRequestOrStop();
+ } catch (Exception e) {
+ failed(e);
+ }
+ }
+
+ public void onException(IOException error) {
+ failed(error);
+ }
+
+ public void failed(Exception error) {
+ try {
+ if (!stopped.get()) {
+ LOG.warn("Replication session failure: " + transport.getRemoteAddress());
+ }
+ stop();
+ } catch (Exception ignore) {
+ }
+ }
+
+ public void transportInterupted() {
+ }
+
+ public void transportResumed() {
+ }
+
+ }
+
+ private void copy(InputStream is, OutputStream os, long length) throws IOException {
+ byte buffer[] = new byte[1024 * 4];
+ int c = 0;
+ long pos = 0;
+ while (pos < length && ((c = is.read(buffer, 0, (int) Math.min(buffer.length, length - pos))) >= 0)) {
+ os.write(buffer, 0, c);
+ pos += c;
+ }
+ }
+
+ private void copy(InputStream is, DataOutput os, long length) throws IOException {
+ byte buffer[] = new byte[1024 * 4];
+ int c = 0;
+ long pos = 0;
+ while (pos < length && ((c = is.read(buffer, 0, (int) Math.min(buffer.length, length - pos))) >= 0)) {
+ os.write(buffer, 0, c);
+ pos += c;
+ }
+ }
+
+ private void delete(List<String> files) {
+ for (String fn : files) {
+ try {
+ replicationServer.getReplicationFile(fn).delete();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+
+public class ReplicationSupport {
+
+ static public PBJournalLocation convert(Location loc) {
+ if( loc==null ) {
+ return null;
+ }
+ return new PBJournalLocation().setFileId(loc.getDataFileId()).setOffset(loc.getOffset());
+ }
+
+ static public Location convert(PBJournalLocation location) {
+ Location rc = new Location();
+ rc.setDataFileId(location.getFileId());
+ rc.setOffset(location.getOffset());
+ return rc;
+ }
+
+
+ static public long copyAndChecksum(File input, File output) throws IOException {
+ FileInputStream is = null;
+ FileOutputStream os = null;
+ try {
+ is = new FileInputStream(input);
+ os = new FileOutputStream(output);
+
+ byte buffer[] = new byte[1024 * 4];
+ int c;
+
+ Checksum checksum = new Adler32();
+ while ((c = is.read(buffer)) >= 0) {
+ os.write(buffer, 0, c);
+ checksum.update(buffer, 0, c);
+ }
+ return checksum.getValue();
+
+ } finally {
+ try {
+ is.close();
+ } catch(Throwable e) {
+ }
+ try {
+ os.close();
+ } catch(Throwable e) {
+ }
+ }
+ }
+
+ public static PBFileInfo createInfo(String name, File file, long start, long length) throws IOException {
+ PBFileInfo rc = new PBFileInfo();
+ rc.setName(name);
+ rc.setChecksum(checksum(file, start, length));
+ rc.setStart(start);
+ rc.setEnd(length);
+ return rc;
+ }
+
+ public static long checksum(File file, long start, long end) throws IOException {
+ RandomAccessFile raf = new RandomAccessFile(file, "r");
+ try {
+ Checksum checksum = new Adler32();
+ byte buffer[] = new byte[1024 * 4];
+ int c;
+ long pos = start;
+ raf.seek(start);
+
+ while (pos < end && (c = raf.read(buffer, 0, (int) Math.min(end - pos, buffer.length))) >= 0) {
+ checksum.update(buffer, 0, c);
+ pos += c;
+ }
+
+ return checksum.getValue();
+ } finally {
+ try {
+ raf.close();
+ } catch (Throwable e) {
+ }
+ }
+ }
+
+}