You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ba...@apache.org on 2014/08/26 17:30:56 UTC
svn commit: r1620634 - in /jackrabbit/oak/trunk/oak-tarmk-failover/src:
main/java/org/apache/jackrabbit/oak/plugins/segment/failover/
main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/
main/java/org/apache/jackrabbit/oak/plugins/segme...
Author: baedke
Date: Tue Aug 26 15:30:56 2014
New Revision: 1620634
URL: http://svn.apache.org/r1620634
Log:
OAK-1915: TarMK failover 2.0
Bug fixed: master startup occasionally deadlocked due to race conditions.
Features added: MBean support, compression, checksums, master access restriction by ip address, client can be started via oak-run runmode.
Added unit tests.
Added:
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/FailoverStatusMBean.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java
Modified:
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java
Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.ObservablePartnerMBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CommunicationObserver {
+
+ private class CommunicationPartnerMBean implements ObservablePartnerMBean {
+ private final ObjectName mbeanName;
+ private final String clientName;
+ public String lastRequest;
+ public Date lastSeen;
+ public String remoteAddress;
+ public int remotePort;
+
+ public CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException {
+ this.clientName = clientName;
+ this.mbeanName = new ObjectName(FailoverStatusMBean.JMX_NAME + ",id=\"Client " + clientName + "\"");
+ }
+
+ public ObjectName getMBeanName() {
+ return this.mbeanName;
+ }
+
+ @Override
+ public String getName() {
+ return this.clientName;
+ }
+
+ @Override
+ public String getRemoteAddress() {
+ return this.remoteAddress;
+ }
+
+ @Override
+ public String getLastRequest() {
+ return this.lastRequest;
+ }
+
+ @Override
+ public int getRemotePort() {
+ return this.remotePort;
+ }
+
+ @Override
+ public String getLastSeenTimestamp() {
+ return this.lastSeen == null ? null : this.lastSeen.toString();
+ }
+ }
+
+ private static final Logger log = LoggerFactory
+ .getLogger(CommunicationObserver.class);
+
+ private final String identifier;
+ private final Map<String, CommunicationPartnerMBean> partnerDetails;
+
+ public CommunicationObserver(String myID) {
+ this.identifier = myID;
+ this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>();
+ }
+
+ public void unregister() {
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
+ try {
+ jmxServer.unregisterMBean(m.getMBeanName());
+ }
+ catch (Exception e) {
+ log.error("error unregistering mbean for client '" + m.getName() + "'", e);
+ }
+ }
+ }
+
+ public void gotMessageFrom(String client, String request, InetSocketAddress remote) throws MalformedObjectNameException {
+ CommunicationPartnerMBean m = this.partnerDetails.get(client);
+ boolean register = false;
+ if (m == null) {
+ m = new CommunicationPartnerMBean(client);
+ m.remoteAddress = remote.getAddress().getHostAddress();
+ m.remotePort = remote.getPort();
+ register = true;
+ }
+ m.lastSeen = new Date();
+ m.lastRequest = request;
+ this.partnerDetails.put(client, m);
+ if (register) {
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ jmxServer.registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), m.getMBeanName());
+ }
+ catch (Exception e) {
+ log.error("can register mbean for client '" + m.getName() + "'", e);
+ }
+ }
+ }
+
+ public String getID() {
+ return this.identifier;
+ }
+}
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java Tue Aug 26 15:30:56 2014
@@ -27,6 +27,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.compression.SnappyFramedDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
@@ -34,15 +35,24 @@ import io.netty.util.concurrent.DefaultE
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.Closeable;
+import java.lang.management.ManagementFactory;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.CommunicationObserver;
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;
import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdDecoder;
import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class FailoverClient implements Runnable, Closeable {
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+public final class FailoverClient implements FailoverStatusMBean, Runnable, Closeable {
+ public static final String CLIENT_ID_PROPERTY_NAME = "failOverID";
private static final Logger log = LoggerFactory
.getLogger(FailoverClient.class);
@@ -52,20 +62,60 @@ public final class FailoverClient implem
private int readTimeoutMs = 10000;
private final FailoverStore store;
+ private final CommunicationObserver observer;
private FailoverClientHandler handler;
private EventLoopGroup group;
private EventExecutorGroup executor;
+ private boolean running;
+ private String state;
public FailoverClient(String host, int port, SegmentStore store) {
+ this.state = STATUS_INITIALIZING;
this.host = host;
this.port = port;
this.store = new FailoverStore(store);
+ String s = System.getProperty(CLIENT_ID_PROPERTY_NAME);
+ this.observer = new CommunicationObserver((s == null || s.length() == 0) ? UUID.randomUUID().toString() : s);
+
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ jmxServer.registerMBean(new StandardMBean(this, FailoverStatusMBean.class), new ObjectName(this.getMBeanName()));
+ }
+ catch (Exception e) {
+ log.error("can register failover status mbean", e);
+ }
}
- public void run() {
+ public String getMBeanName() {
+ return FailoverStatusMBean.JMX_NAME + ",id=\"" + this.observer.getID() + "\"";
+ }
+
+ public void close() {
+ stop();
+ state = STATUS_CLOSING;
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ jmxServer.unregisterMBean(new ObjectName(this.getMBeanName()));
+ }
+ catch (Exception e) {
+ log.error("can unregister failover status mbean", e);
+ }
+ observer.unregister();
+ if (group != null && !group.isShuttingDown()) {
+ group.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+ .syncUninterruptibly();
+ }
+ if (executor != null && !executor.isShuttingDown()) {
+ executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+ .syncUninterruptibly();
+ }
+ state = STATUS_CLOSED;
+ }
+ public void run() {
+ state = STATUS_STARTING;
this.executor = new DefaultEventExecutorGroup(4);
- this.handler = new FailoverClientHandler(this.store, executor);
+ this.handler = new FailoverClientHandler(this.store, executor, this.observer);
group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
@@ -83,33 +133,49 @@ public final class FailoverClient implem
// WriteTimeoutHandler & ReadTimeoutHandler
p.addLast("readTimeoutHandler", new ReadTimeoutHandler(
readTimeoutMs, TimeUnit.MILLISECONDS));
-
p.addLast(new StringEncoder(CharsetUtil.UTF_8));
+ p.addLast(new SnappyFramedDecoder(true));
p.addLast(new RecordIdDecoder(store));
p.addLast(executor, handler);
}
});
try {
// Start the client.
+ running = true;
+ state = STATUS_RUNNING;
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} catch (Exception e) {
log.error("Failed synchronizing state.", e);
- } finally {
- close();
+ stop();
}
}
@Override
- public void close() {
- if (group != null && !group.isShuttingDown()) {
- group.shutdownGracefully(1, 2, TimeUnit.SECONDS)
- .syncUninterruptibly();
- }
- if (executor != null && !executor.isShuttingDown()) {
- executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
- .syncUninterruptibly();
+ public String getMode() {
+ return "client: " + this.observer.getID();
+ }
+
+ @Override
+ public boolean isRunning() { return running;}
+
+ @Override
+ public void start() {
+ if (!running) run();
+ }
+
+ @Override
+ public void stop() {
+ //TODO running flag doesn't make sense this way, since run() is usually scheduled to be called repeatedly.
+ if (running) {
+ running = false;
+ state = STATUS_STOPPED;
}
}
+
+ @Override
+ public String getStatus() {
+ return this.state;
+ }
}
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java Tue Aug 26 15:30:56 2014
@@ -28,9 +28,11 @@ import io.netty.util.concurrent.GenericF
import io.netty.util.concurrent.Promise;
import java.io.Closeable;
+import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
+import org.apache.jackrabbit.oak.plugins.segment.failover.CommunicationObserver;
import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdDecoder;
import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentDecoder;
import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
@@ -45,6 +47,7 @@ public class FailoverClientHandler exten
private final FailoverStore store;
private final EventExecutorGroup executor;
+ private final CommunicationObserver observer;
private EventExecutorGroup preloaderExecutor;
private EventExecutorGroup loaderExecutor;
@@ -53,9 +56,10 @@ public class FailoverClientHandler exten
private Promise<RecordId> headPromise;
public FailoverClientHandler(final FailoverStore store,
- EventExecutorGroup executor) {
+ EventExecutorGroup executor, CommunicationObserver observer) {
this.store = store;
this.executor = executor;
+ this.observer = observer;
}
@Override
@@ -91,7 +95,7 @@ public class FailoverClientHandler exten
}
}
});
- ctx.writeAndFlush(newGetHeadReq()).addListener(
+ ctx.writeAndFlush(newGetHeadReq(this.observer.getID())).addListener(
new FailedRequestListener(headPromise));
}
@@ -114,7 +118,7 @@ public class FailoverClientHandler exten
loaderExecutor = new DefaultEventExecutorGroup(4);
SegmentLoaderHandler h2 = new SegmentLoaderHandler(store, head,
- preloaderExecutor, loaderExecutor);
+ preloaderExecutor, loaderExecutor, this.observer.getID());
ctx.pipeline().addLast(loaderExecutor, h2);
h1.channelActive(ctx);
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java Tue Aug 26 15:30:56 2014
@@ -48,6 +48,7 @@ public class SegmentLoaderHandler extend
.getLogger(SegmentLoaderHandler.class);
private final FailoverStore store;
+ private final String clientID;
private final RecordId head;
private final EventExecutorGroup preloaderExecutor;
private final EventExecutorGroup loaderExecutor;
@@ -60,11 +61,13 @@ public class SegmentLoaderHandler extend
public SegmentLoaderHandler(final FailoverStore store, RecordId head,
EventExecutorGroup preloaderExecutor,
- EventExecutorGroup loaderExecutor) {
+ EventExecutorGroup loaderExecutor,
+ String clientID) {
this.store = store;
this.head = head;
this.preloaderExecutor = preloaderExecutor;
this.loaderExecutor = loaderExecutor;
+ this.clientID = clientID;
}
@Override
@@ -103,7 +106,7 @@ public class SegmentLoaderHandler extend
@Override
public Segment readSegment(final SegmentId id) {
- ctx.writeAndFlush(newGetSegmentReq(id)).addListener(reqListener);
+ ctx.writeAndFlush(newGetSegmentReq(this.clientID, id)).addListener(reqListener);
return getSegment();
}
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java Tue Aug 26 15:30:56 2014
@@ -26,14 +26,37 @@ public class Messages {
public static final byte HEADER_SEGMENT = 0x01;
public static final String GET_HEAD = "h";
-
public static final String GET_SEGMENT = "s.";
- public static String newGetHeadReq() {
- return GET_HEAD + "\r\n";
+ private static final String MAGIC = "FailOver-CMD@";
+ private static final String SEPARATOR = ":";
+
+ private static String newRequest(String clientID, String body) {
+ return MAGIC + (clientID == null ? "" : clientID.replace(SEPARATOR, "#")) + SEPARATOR + body + "\r\n";
+ }
+
+ public static String newGetHeadReq(String clientID) {
+ return newRequest(clientID, GET_HEAD);
+ }
+
+ public static String newGetSegmentReq(String clientID, SegmentId sid) {
+ return newRequest(clientID, GET_SEGMENT + sid.toString());
+ }
+
+ public static String extractMessageFrom(String payload) {
+ if (payload.startsWith(MAGIC) && payload.length() > MAGIC.length()) {
+ int i = payload.indexOf(SEPARATOR);
+ return payload.substring(i + 1);
+ }
+ return null;
}
- public static String newGetSegmentReq(SegmentId sid) {
- return GET_SEGMENT + sid.toString() + "\r\n";
+ public static String extractClientFrom(String payload) {
+ if (payload.startsWith(MAGIC) && payload.length() > MAGIC.length()) {
+ payload = payload.substring(MAGIC.length());
+ int i = payload.indexOf(SEPARATOR);
+ return payload.substring(0, i);
+ }
+ return null;
}
}
Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/FailoverStatusMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/FailoverStatusMBean.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/FailoverStatusMBean.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/FailoverStatusMBean.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.failover.jmx;
+
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+import javax.annotation.Nonnull;
+
+public interface FailoverStatusMBean {
+ public static final String JMX_NAME = "org.apache.jackrabbit.oak:name=Status,type=\"FailOver\"";
+ public static final String STATUS_INITIALIZING = "initializing";
+ public static final String STATUS_STOPPED = "stopped";
+ public static final String STATUS_STARTING = "starting";
+ public static final String STATUS_RUNNING = "running";
+ public static final String STATUS_CLOSING = "closing";
+ public static final String STATUS_CLOSED = "closed";
+
+ @Nonnull
+ @Description("master or client")
+ String getMode();
+
+ @Description("current status of the service")
+ String getStatus();
+
+ @Description("instance is running")
+ boolean isRunning();
+
+ @Description("stop the failover communication")
+ void stop();
+
+ @Description("start the failover communication")
+ void start();
+}
Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.jmx;
+
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+public interface ObservablePartnerMBean {
+
+ @Nonnull
+ @Description("name of the partner")
+ String getName();
+
+ @Description("IP of the remote")
+ String getRemoteAddress();
+
+ @Description("Last request")
+ String getLastRequest();
+
+ @Description("Port of the remote")
+ int getRemotePort();
+
+ @CheckForNull
+ @Description("Time the remote instance was last contacted")
+ String getLastSeenTimestamp();
+}
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java Tue Aug 26 15:30:56 2014
@@ -19,6 +19,7 @@
package org.apache.jackrabbit.oak.plugins.segment.failover.server;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -27,30 +28,62 @@ import io.netty.channel.nio.NioEventLoop
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
+import io.netty.handler.codec.compression.SnappyFramedEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;
import java.io.Closeable;
+import java.lang.management.ManagementFactory;
import java.util.concurrent.TimeUnit;
+import io.netty.util.concurrent.Future;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.CommunicationObserver;
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;
import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdEncoder;
import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class FailoverServer implements Closeable {
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+public class FailoverServer implements FailoverStatusMBean, Closeable {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(FailoverServer.class);
private final int port;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final ServerBootstrap b;
+ private final CommunicationObserver observer;
+ private final FailoverServerHandler handler;
+ private ChannelFuture channelFuture;
+ private boolean running;
public FailoverServer(int port, final SegmentStore store) {
+ this(port, store, null);
+ }
+
+ public FailoverServer(int port, final SegmentStore store, String[] allowedClientIPRanges) {
this.port = port;
+ observer = new CommunicationObserver("master");
+ handler = new FailoverServerHandler(store, observer, allowedClientIPRanges);
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ jmxServer.registerMBean(new StandardMBean(this, FailoverStatusMBean.class), new ObjectName(this.getMBeanName()));
+ }
+ catch (Exception e) {
+ log.error("can register failover status mbean", e);
+ }
+
b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
@@ -67,32 +100,92 @@ public class FailoverServer implements C
ChannelPipeline p = ch.pipeline();
p.addLast(new LineBasedFrameDecoder(8192));
p.addLast(new StringDecoder(CharsetUtil.UTF_8));
+ p.addLast(new SnappyFramedEncoder());
p.addLast(new RecordIdEncoder());
p.addLast(new SegmentEncoder());
- p.addLast(new FailoverServerHandler(store));
+ p.addLast(handler);
}
});
}
- public void start() {
- try {
- b.bind(port).sync().channel().closeFuture().sync();
- } catch (InterruptedException e) {
- close();
- }
+ public String getMBeanName() {
+ return FailoverStatusMBean.JMX_NAME + ",id=" + this.port;
}
- @Override
public void close() {
+ stop();
+ handler.state = STATUS_CLOSING;
+ observer.unregister();
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ jmxServer.unregisterMBean(new ObjectName(this.getMBeanName()));
+ }
+ catch (Exception e) {
+ log.error("can unregister failover status mbean", e);
+ }
if (bossGroup != null && !bossGroup.isShuttingDown()) {
- bossGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS)
- .syncUninterruptibly();
+ bossGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly();
}
if (workerGroup != null && !workerGroup.isShuttingDown()) {
- workerGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS)
- .syncUninterruptibly();
- ;
+ workerGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly();
+ }
+ handler.state = STATUS_CLOSED;
+ }
+
+ @Override
+ public void start() {
+ if (running) return;
+
+ running = true;
+ this.handler.state = STATUS_STARTING;
+
+ Future<?> startup = bossGroup.submit(new Runnable() {
+ @Override
+ public void run() {
+ //netty 4.0.20 has a race condition issue with
+ //asynchronous channel registration. As a workaround
+ //we bind asynchronously from the boss event group to make
+ //the channel registration synchronous.
+ //Note that now this method will return immediately.
+ channelFuture = b.bind(port);
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ running = true;
+ channelFuture.sync().channel().closeFuture().sync();
+ } catch (InterruptedException e) {
+ FailoverServer.this.stop();
+ }
+ }
+ }.start();
+ }
+ });
+ if (!startup.awaitUninterruptibly(10000)) {
+ log.error("FailoverServer failed to start within 10 seconds and will be canceled");
+ startup.cancel(true);
+ }
+ }
+
+ @Override
+ public String getMode() {
+ return "master";
+ }
+
+ @Override
+ public boolean isRunning() { return running; }
+
+ @Override
+ public void stop() {
+ if (running) {
+ running = false;
+ this.handler.state = STATUS_STOPPED;
+ channelFuture.channel().disconnect();
}
}
+ @Override
+ public String getStatus() {
+ return handler == null ? STATUS_INITIALIZING : handler.state;
+ }
}
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java Tue Aug 26 15:30:56 2014
@@ -18,6 +18,9 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.failover.server;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -30,6 +33,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.CommunicationObserver;
import org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,9 +45,14 @@ public class FailoverServerHandler exten
.getLogger(FailoverServerHandler.class);
private final SegmentStore store;
+ private final CommunicationObserver observer;
+ private final String[] allowedIPRanges;
+ public String state;
- public FailoverServerHandler(SegmentStore store) {
+ public FailoverServerHandler(SegmentStore store, CommunicationObserver observer, String[] allowedIPRanges) {
this.store = store;
+ this.observer = observer;
+ this.allowedIPRanges = allowedIPRanges;
}
private RecordId headId() {
@@ -53,49 +62,117 @@ public class FailoverServerHandler exten
return null;
}
- @Override
- public void channelRead0(ChannelHandlerContext ctx, String request)
- throws Exception {
- if (Messages.GET_HEAD.equalsIgnoreCase(request)) {
- RecordId r = headId();
- if (r != null) {
- ctx.writeAndFlush(r);
- } else {
- ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
+ private static long ipToLong(InetAddress ip) {
+ byte[] octets = ip.getAddress();
+ long result = 0;
+ for (byte octet : octets) {
+ result <<= 8;
+ result |= octet & 0xff;
+ }
+ return result;
+ }
+
+ private boolean clientAllowed(InetSocketAddress client) {
+ if (this.allowedIPRanges != null) {
+ for (String s : this.allowedIPRanges) {
+ try {
+ if (ipToLong(InetAddress.getByName(s)) == ipToLong(client.getAddress())) {
+ return true;
+ }
+ }
+ catch (UnknownHostException ignored) { /* it's an ip range */ }
+ int i = s.indexOf('-');
+ if (i > 0) {
+ try {
+ long startIPRange = ipToLong(InetAddress.getByName(s.substring(0, i).trim()));
+ long endIPRange = ipToLong(InetAddress.getByName(s.substring(i + 1).trim()));
+ long ipl = ipToLong(client.getAddress());
+ if (startIPRange <= ipl && ipl <= endIPRange) return true;
+ }
+ catch (Exception e) {
+ log.warn("invalid IP-range format: " + s);
+ }
+ }
}
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
+ state = "Channel registered";
+ super.channelRegistered(ctx);
+ }
+
+ @Override
+ public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
+ state = "Channel active";
+ super.channelActive(ctx);
+ }
- } else if (request.startsWith(Messages.GET_SEGMENT)) {
- String sid = request.substring(Messages.GET_SEGMENT.length());
- log.debug("request segment id {}", sid);
- UUID uuid = UUID.fromString(sid);
+ @Override
+ public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
+ state = "Channel inactive";
+ super.channelInactive(ctx);
+ }
- Segment s = null;
+ @Override
+ public void channelUnregistered(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
+ state = "Channel unregistered";
+ super.channelUnregistered(ctx);
+ }
- for (int i = 0; i < 3; i++) {
- try {
- s = store.readSegment(new SegmentId(store.getTracker(),
- uuid.getMostSignificantBits(), uuid
- .getLeastSignificantBits()));
- } catch (IllegalStateException e) {
- // segment not found
- log.warn(e.getMessage());
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, String payload)
+ throws Exception {
+ state = "got message";
+
+ String request = Messages.extractMessageFrom(payload);
+ InetSocketAddress client = (InetSocketAddress)ctx.channel().remoteAddress();
+ if (!clientAllowed(client)) {
+ log.warn("Got request from client " + client + " which is not in the allowed ip ranges! Request will be ignored.");
+ }
+ else {
+ observer.gotMessageFrom(Messages.extractClientFrom(payload), request, client);
+ if (Messages.GET_HEAD.equalsIgnoreCase(request)) {
+ RecordId r = headId();
+ if (r != null) {
+ ctx.writeAndFlush(r);
+ return;
}
- if (s != null) {
- break;
- } else {
- TimeUnit.MILLISECONDS.sleep(500);
+ } else if (request.startsWith(Messages.GET_SEGMENT)) {
+ String sid = request.substring(Messages.GET_SEGMENT.length());
+ log.debug("request segment id {}", sid);
+ UUID uuid = UUID.fromString(sid);
+
+ Segment s = null;
+
+ for (int i = 0; i < 3; i++) {
+ try {
+ s = store.readSegment(new SegmentId(store.getTracker(),
+ uuid.getMostSignificantBits(), uuid
+ .getLeastSignificantBits()));
+ } catch (IllegalStateException e) {
+ // segment not found
+ log.warn(e.getMessage());
+ }
+ if (s != null) {
+ break;
+ } else {
+ TimeUnit.MILLISECONDS.sleep(500);
+ }
}
- }
- if (s != null) {
- ctx.writeAndFlush(s);
+ if (s != null) {
+ ctx.writeAndFlush(s);
+ return;
+ }
} else {
- ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
+ log.warn("Unknown request {}, ignoring.", request);
}
- } else {
- log.warn("Unknown request {}, ignoring.", request);
- ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
}
+ ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
}
@Override
@@ -105,6 +182,7 @@ public class FailoverServerHandler exten
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ state = "exception occurred: " + cause.getMessage();
log.error(cause.getMessage(), cause);
ctx.close();
}
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java Tue Aug 26 15:30:56 2014
@@ -16,7 +16,6 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.failover.store;
-import static java.lang.Integer.parseInt;
import static java.lang.String.valueOf;
import static org.apache.felix.scr.annotations.ReferencePolicy.STATIC;
import static org.apache.felix.scr.annotations.ReferencePolicyOption.GREEDY;
@@ -32,6 +31,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.PropertyOption;
import org.apache.felix.scr.annotations.Reference;
+import org.apache.jackrabbit.oak.commons.PropertiesUtil;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStoreService;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
@@ -42,36 +42,44 @@ import org.osgi.service.component.Compon
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
@Component(policy = ConfigurationPolicy.REQUIRE)
public class FailoverStoreService {
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String MODE_MASTER = "master";
+ private static final String MODE_SLAVE = "slave";
+
+ public static final String MODE_DEFAULT = MODE_MASTER;
@Property(label = "Mode", description = "TarMK Failover mode (master or slave)", options = {
@PropertyOption(name = "master", value = "master"),
- @PropertyOption(name = "slave", value = "slave") }, value = "master")
+ @PropertyOption(name = "slave", value = "slave") }, value = MODE_DEFAULT)
public static final String MODE = "mode";
- @Property(label = "Port", description = "TarMK Failover port", intValue = 8023)
+ public static final int PORT_DEFAULT = 8023;
+ @Property(label = "Port", description = "TarMK Failover port", intValue = PORT_DEFAULT)
public static final String PORT = "port";
- @Property(label = "Master Host", description = "TarMK Failover master host (enabled for slave mode only)", value = "127.0.0.1")
+ public static final String MASTER_HOST_DEFAULT = "127.0.0.1";
+ @Property(label = "Master Host", description = "TarMK Failover master host (enabled for slave mode only)", value = MASTER_HOST_DEFAULT)
public static final String MASTER_HOST = "master.host";
- @Property(label = "Sync interval (seconds)", description = "TarMK Failover sync interval (seconds)", intValue = 5)
+ public static final int INTERVAL_DEFAULT = 5;
+ @Property(label = "Sync interval (seconds)", description = "TarMK Failover sync interval (seconds)", intValue = INTERVAL_DEFAULT)
public static final String INTERVAL = "interval";
- private static String MODE_MASTER = "master";
-
- private static String MODE_SLAVE = "slave";
+ public static final String ALLOWED_CLIENT_IP_RANGES_DEFAULT = null;
+ @Property(label = "Client allowed IP-Ranges", description = "accept incoming requests for these IP-ranges only")
+ public static final String ALLOWED_CLIENT_IP_RANGES = "master.allowed-client-ip-ranges";
@Reference(policy = STATIC, policyOption = GREEDY)
private NodeStore store = null;
-
private SegmentStore segmentStore;
private FailoverServer master = null;
private FailoverClient sync = null;
+
private ServiceRegistration syncReg = null;
@Activate
@@ -110,17 +118,19 @@ public class FailoverStoreService {
private void bootstrapMaster(ComponentContext context) {
Dictionary<?, ?> props = context.getProperties();
- int port = parseInt(valueOf(props.get(PORT)));
- master = new FailoverServer(port, segmentStore);
+ int port = PropertiesUtil.toInteger(props.get(PORT), PORT_DEFAULT);
+ String ipRanges = PropertiesUtil.toString(props.get(ALLOWED_CLIENT_IP_RANGES), ALLOWED_CLIENT_IP_RANGES_DEFAULT);
+ String[] ranges = ipRanges == null ? null : ipRanges.split(",");
+ master = new FailoverServer(port, segmentStore, ranges);
master.start();
- log.info("started failover master on port {}.", port);
+ log.info("started failover master on port {} with allowed ip ranges {}.", port, ipRanges);
}
private void bootstrapSlave(ComponentContext context) {
Dictionary<?, ?> props = context.getProperties();
- int port = parseInt(valueOf(props.get(PORT)));
- long interval = parseInt(valueOf(props.get(INTERVAL)));
- String host = valueOf(context.getProperties().get(MASTER_HOST));
+ int port = PropertiesUtil.toInteger(props.get(PORT), PORT_DEFAULT);
+ long interval = PropertiesUtil.toInteger(props.get(INTERVAL), INTERVAL_DEFAULT);
+ String host = PropertiesUtil.toString(props.get(MASTER_HOST), MASTER_HOST_DEFAULT);
sync = new FailoverClient(host, port, segmentStore);
Dictionary<Object, Object> dictionary = new Hashtable<Object, Object>();
Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+
+import javax.annotation.Nonnull;
+
+public class DebugSegmentStore implements SegmentStore {
+
+ private final SegmentStore target;
+ public boolean createReadErrors;
+
+ public DebugSegmentStore(SegmentStore targetStore) {
+ this.target = targetStore;
+ }
+
+ @Override
+ public SegmentTracker getTracker() {
+ return this.target.getTracker();
+ }
+
+ @Nonnull
+ @Override
+ public SegmentNodeState getHead() {
+ return this.target.getHead();
+ }
+
+ @Override
+ public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
+ return this.target.setHead(base, head);
+ }
+
+ @Override
+ public boolean containsSegment(SegmentId id) {
+ return this.target.containsSegment(id);
+ }
+
+ @Override
+ public Segment readSegment(SegmentId segmentId) {
+ return createReadErrors ? null : this.target.readSegment(segmentId);
+ }
+
+ @Override
+ public void writeSegment(SegmentId id, byte[] bytes, int offset, int length) {
+ this.target.writeSegment(id, bytes, offset, length);
+ }
+
+ @Override
+ public void close() {
+ this.target.close();
+ }
+
+ @Override
+ public Blob readBlob(String reference) {
+ return this.target.readBlob(reference);
+ }
+
+ @Override
+ public BlobStore getBlobStore() {
+ return this.target.getBlobStore();
+ }
+
+ @Override
+ public void gc() {
+ this.target.gc();
+ }
+}
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java Tue Aug 26 15:30:56 2014
@@ -18,9 +18,20 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import static java.io.File.createTempFile;
import static org.apache.jackrabbit.oak.plugins.segment.Segment.MAX_SEGMENT_SIZE;
import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ALIGN_BITS;
+import static org.junit.Assert.assertEquals;
+import java.io.File;
+import java.io.IOException;
import java.util.Random;
public class SegmentTestUtils {
@@ -38,4 +49,30 @@ public class SegmentTestUtils {
RecordId r = new RecordId(id, newValidOffset(random));
return r;
}
+
+ public static void assertEqualStores(File d1, File d2) throws IOException {
+ FileStore f1 = new FileStore(d1, 1, false);
+ FileStore f2 = new FileStore(d2, 1, false);
+ try {
+ assertEquals(f1.getHead(), f2.getHead());
+ } finally {
+ f1.close();
+ f2.close();
+ }
+ }
+
+ public static void addTestContent(NodeStore store, String child)
+ throws CommitFailedException {
+ NodeBuilder builder = store.getRoot().builder();
+ builder.child(child).setProperty("ts", System.currentTimeMillis());
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+
+ public static File createTmpTargetDir(String name) throws IOException {
+ File f = createTempFile(name, "dir", new File("target"));
+ f.delete();
+ f.mkdir();
+ return f;
+ }
+
}
Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class FailoverIPRangeTest extends TestBase {
+
+ @Before
+ public void setUp() throws Exception {
+ setUpServerAndClient();
+ }
+
+ @After
+ public void after() {
+ closeServerAndClient();
+ }
+
+ @Test
+ public void testFailoverAllClients() throws Exception {
+ createTestWithConfig(null, true);
+ }
+ @Test
+
+ public void testFailoverLocalClient() throws Exception {
+ createTestWithConfig(new String[]{"127.0.0.1"}, true);
+ }
+
+ @Test
+ public void testFailoverWrongClient() throws Exception {
+ createTestWithConfig(new String[]{"127.0.0.2"}, false);
+ }
+
+ @Test
+ public void testFailoverLocalhost() throws Exception {
+ createTestWithConfig(new String[]{"localhost"}, true);
+ }
+
+ @Test
+ public void testFailoverInvalidName() throws Exception {
+ createTestWithConfig(new String[]{"foobar"}, false);
+ }
+
+ @Test
+ public void testFailoverValidIPRangeStart() throws Exception {
+ createTestWithConfig(new String[]{"127.0.0.1-127.0.0.2"}, true);
+ }
+
+ @Test
+ public void testFailoverValidIPRangeEnd() throws Exception {
+ createTestWithConfig(new String[]{"127.0.0.0-127.0.0.1"}, true);
+ }
+
+ @Test
+ public void testFailoverValidIPRange() throws Exception {
+ createTestWithConfig(new String[]{"127.0.0.0-127.0.0.2"}, true);
+ }
+
+ @Test
+ public void testFailoverInvalidRange() throws Exception {
+ createTestWithConfig(new String[]{"127.0.0.2-127.0.0.1"}, false);
+ }
+
+ @Test
+ public void testFailoverCorrectList() throws Exception {
+ createTestWithConfig(new String[]{"foobar","127-128","126.0.0.1", "127.0.0.0-127.255.255.255"}, true);
+ }
+
+ @Test
+ public void testFailoverWrongList() throws Exception {
+ createTestWithConfig(new String[]{"foobar","126.0.0.1", "128.0.0.1-255.255.255.255", "128.0.0.0-127.255.255.255"}, false);
+ }
+
+ private void createTestWithConfig(String[] ipRanges, boolean expectedToWork) throws Exception {
+ NodeStore store = new SegmentNodeStore(storeS);
+ final FailoverServer server = new FailoverServer(port, storeS, ipRanges);
+ server.start();
+ addTestContent(store, "server");
+
+ FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC);
+ cl.run();
+
+ try {
+ if (expectedToWork) {
+ assertEquals(storeS.getHead(), storeC.getHead());
+ }
+ else {
+ assertFalse("stores are equal but shouldn't!", storeS.getHead().equals(storeC.getHead()));
+ }
+ } finally {
+ server.close();
+ cl.close();
+ }
+
+ }
+
+}
Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils;
+import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+public class FailoverMultipleClientsTest extends TestBase {
+
+ @Before
+ public void setUp() throws Exception {
+ setUpServerAndTwoClients();
+ }
+
+ @After
+ public void after() {
+ closeServerAndTwoClients();
+ }
+
+ @Test
+ public void testMultipleClients() throws Exception {
+ NodeStore store = new SegmentNodeStore(storeS);
+ final FailoverServer server = new FailoverServer(port, storeS);
+ server.start();
+ SegmentTestUtils.addTestContent(store, "server");
+
+ FailoverClient cl1 = new FailoverClient("127.0.0.1", port, storeC);
+ FailoverClient cl2 = new FailoverClient("127.0.0.1", port, storeC2);
+
+ try {
+ assertFalse("first client has invalid initial store!", storeS.getHead().equals(storeC.getHead()));
+ assertFalse("second client has invalid initial store!", storeS.getHead().equals(storeC2.getHead()));
+ assertEquals(storeC.getHead(), storeC2.getHead());
+
+ cl1.run();
+ cl2.run();
+
+ assertEquals(storeS.getHead(), storeC.getHead());
+ assertEquals(storeS.getHead(), storeC2.getHead());
+
+ cl1.stop();
+ SegmentTestUtils.addTestContent(store, "test");
+ cl2.run();
+
+ assertEquals(storeS.getHead(), storeC2.getHead());
+ Assert.assertFalse("first client updated in stopped state!", storeS.getHead().equals(storeC.getHead()));
+
+ cl1.start();
+ assertEquals(storeS.getHead(), storeC.getHead());
+ } finally {
+ server.close();
+ cl1.close();
+ cl2.close();
+ }
+ }
+
+}
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java Tue Aug 26 15:30:56 2014
@@ -18,65 +18,32 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.failover;
-import static java.io.File.createTempFile;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
import static org.junit.Assert.assertEquals;
import java.io.File;
-import java.io.IOException;
-import org.apache.commons.io.FileUtils;
-import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
-import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
-import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
-import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-public class FailoverTest {
-
- private int port = Integer.valueOf(System.getProperty(
- "failover.server.port", "52808"));
-
- private File directoryS;
-
- private FileStore storeS;
-
- private File directoryC;
-
- private FileStore storeC;
+public class FailoverTest extends TestBase {
@Before
- public void setUp() throws IOException {
- File target = new File("target");
-
- // server
- directoryS = createTempFile("FailoverServerTest", "dir", target);
- directoryS.delete();
- directoryS.mkdir();
- storeS = new FileStore(directoryS, 1, false);
-
- // client
- directoryC = createTempFile("FailoverClientTest", "dir", target);
- directoryC.delete();
- directoryC.mkdir();
- storeC = new FileStore(directoryC, 1, false);
+ public void setUp() throws Exception {
+ setUpServerAndClient();
}
@After
public void after() {
- storeS.close();
- storeC.close();
- try {
- FileUtils.deleteDirectory(directoryS);
- FileUtils.deleteDirectory(directoryC);
- } catch (IOException e) {
- }
+ closeServerAndClient();
}
@Test
@@ -84,13 +51,8 @@ public class FailoverTest {
NodeStore store = new SegmentNodeStore(storeS);
final FailoverServer server = new FailoverServer(port, storeS);
- Thread s = new Thread() {
- public void run() {
- server.start();
- }
- };
- s.start();
- addTestContent(store);
+ server.start();
+ addTestContent(store, "server");
FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC);
cl.run();
@@ -104,26 +66,8 @@ public class FailoverTest {
}
- private static void addTestContent(NodeStore store)
- throws CommitFailedException {
- NodeBuilder builder = store.getRoot().builder();
- builder.child("server").setProperty("ts", System.currentTimeMillis());
- store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
- }
-
- static void assertEqualStores(File d1, File d2) throws IOException {
- FileStore f1 = new FileStore(d1, 1, false);
- FileStore f2 = new FileStore(d2, 1, false);
- try {
- assertEquals(f1.getHead(), f2.getHead());
- } finally {
- f1.close();
- f2.close();
- }
- }
-
public static void main(String[] args) throws Exception {
- File d = createTempFile("FailoverLiveTest", "dir", new File("target"));
+ File d = createTmpTargetDir("FailoverLiveTest");
d.delete();
d.mkdir();
FileStore s = new FileStore(d, 256, false);
Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;
+import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Set;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+public class MBeanTest extends TestBase {
+
+ @Before
+ public void setUp() throws Exception {
+ setUpServerAndClient();
+ }
+
+ @After
+ public void after() {
+ closeServerAndClient();
+ }
+
+ @Test
+ public void testServerEmptyConfig() throws Exception {
+ final FailoverServer server = new FailoverServer(this.port, this.storeS);
+ server.start();
+
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ ObjectName status = new ObjectName(FailoverStatusMBean.JMX_NAME + ",id=*");
+ try {
+ Set<ObjectName> instances = jmxServer.queryNames(status, null);
+ assertEquals(1, instances.size());
+ status = instances.toArray(new ObjectName[0])[0];
+ assertEquals(new ObjectName(server.getMBeanName()), status);
+ assertTrue(jmxServer.isRegistered(status));
+
+ assertEquals("master", jmxServer.getAttribute(status, "Mode"));
+ String m = jmxServer.getAttribute(status, "Status").toString();
+ if (!m.equals(FailoverStatusMBean.STATUS_STARTING) && !m.equals("Channel unregistered"))
+ fail("unexpected Status" + m);
+
+ assertEquals(FailoverStatusMBean.STATUS_STARTING, jmxServer.getAttribute(status, "Status"));
+ assertEquals(true, jmxServer.getAttribute(status, "Running"));
+ jmxServer.invoke(status, "stop", null, null);
+ assertEquals(false, jmxServer.getAttribute(status, "Running"));
+ assertEquals(FailoverStatusMBean.STATUS_STOPPED, jmxServer.getAttribute(status, "Status"));
+ jmxServer.invoke(status, "start", null, null);
+
+ assertEquals(true, jmxServer.getAttribute(status, "Running"));
+ assertEquals(FailoverStatusMBean.STATUS_STARTING, jmxServer.getAttribute(status, "Status"));
+ } finally {
+ server.close();
+ }
+
+ assertTrue(!jmxServer.isRegistered(status));
+ }
+
+ @Test
+ public void testClientEmptyConfigNoServer() throws Exception {
+ final FailoverClient client = new FailoverClient("127.0.0.1", this.port, this.storeC);
+ client.start();
+
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ ObjectName status = new ObjectName(FailoverStatusMBean.JMX_NAME + ",id=*");
+ try {
+ Set<ObjectName> instances = jmxServer.queryNames(status, null);
+ assertEquals(1, instances.size());
+ status = instances.toArray(new ObjectName[0])[0];
+ assertEquals(new ObjectName(client.getMBeanName()), status);
+ assertTrue(jmxServer.isRegistered(status));
+
+ String m = jmxServer.getAttribute(status, "Mode").toString();
+ if (!m.startsWith("client: ")) fail("unexpected mode " + m);
+
+ assertEquals(FailoverStatusMBean.STATUS_STOPPED, jmxServer.getAttribute(status, "Status"));
+
+ assertEquals(false, jmxServer.getAttribute(status, "Running"));
+ jmxServer.invoke(status, "stop", null, null);
+ assertEquals(false, jmxServer.getAttribute(status, "Running"));
+ assertEquals(FailoverStatusMBean.STATUS_STOPPED, jmxServer.getAttribute(status, "Status"));
+ jmxServer.invoke(status, "start", null, null);
+ assertEquals(false, jmxServer.getAttribute(status, "Running"));
+ assertEquals(FailoverStatusMBean.STATUS_STOPPED, jmxServer.getAttribute(status, "Status"));
+ } finally {
+ client.close();
+ }
+
+ assertTrue(!jmxServer.isRegistered(status));
+ }
+
+ @Test
+ public void testClientNoServer() throws Exception {
+ System.setProperty(FailoverClient.CLIENT_ID_PROPERTY_NAME, "Foo");
+ final FailoverClient client = new FailoverClient("127.0.0.1", this.port, this.storeC);
+ client.start();
+
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ ObjectName status = new ObjectName(client.getMBeanName());
+ try {
+ assertTrue(jmxServer.isRegistered(status));
+ assertEquals("client: Foo", jmxServer.getAttribute(status, "Mode"));
+ } finally {
+ client.close();
+ }
+
+ assertTrue(!jmxServer.isRegistered(status));
+ }
+
+ @Test
+ public void testClientAndServerEmptyConfig() throws Exception {
+ final FailoverServer server = new FailoverServer(this.port, this.storeS);
+ server.start();
+
+ System.setProperty(FailoverClient.CLIENT_ID_PROPERTY_NAME, "Bar");
+ final FailoverClient client = new FailoverClient("127.0.0.1", this.port, this.storeC);
+ client.start();
+
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ ObjectName status = new ObjectName(FailoverStatusMBean.JMX_NAME + ",id=*");
+ ObjectName clientStatus = new ObjectName(client.getMBeanName());
+ ObjectName serverStatus = new ObjectName(server.getMBeanName());
+ try {
+ Set<ObjectName> instances = jmxServer.queryNames(status, null);
+ assertEquals(3, instances.size());
+
+ assertTrue(jmxServer.isRegistered(clientStatus));
+ assertTrue(jmxServer.isRegistered(serverStatus));
+
+ String m = jmxServer.getAttribute(clientStatus, "Mode").toString();
+ if (!m.startsWith("client: ")) fail("unexpected mode " + m);
+
+ assertEquals("master", jmxServer.getAttribute(serverStatus, "Mode"));
+
+ assertEquals(true, jmxServer.getAttribute(serverStatus, "Running"));
+ assertEquals(true, jmxServer.getAttribute(clientStatus, "Running"));
+
+ // stop the master
+ jmxServer.invoke(serverStatus, "stop", null, null);
+ assertEquals(false, jmxServer.getAttribute(serverStatus, "Running"));
+ m = jmxServer.getAttribute(serverStatus, "Status").toString();
+ if (!m.equals(FailoverStatusMBean.STATUS_STOPPED) && !m.equals("Channel unregistered"))
+ fail("unexpected Status" + m);
+
+ // restart the master
+ jmxServer.invoke(serverStatus, "start", null, null);
+ assertEquals(true, jmxServer.getAttribute(serverStatus, "Running"));
+ assertEquals(true, jmxServer.getAttribute(clientStatus, "Running"));
+ m = jmxServer.getAttribute(serverStatus, "Status").toString();
+ if (!m.equals(FailoverStatusMBean.STATUS_STARTING) && !m.equals("Channel unregistered"))
+ fail("unexpected Status" + m);
+
+ // stop the slave
+ jmxServer.invoke(clientStatus, "stop", null, null);
+ assertEquals(true, jmxServer.getAttribute(serverStatus, "Running"));
+ assertEquals(false, jmxServer.getAttribute(clientStatus, "Running"));
+ assertEquals(FailoverStatusMBean.STATUS_STOPPED, jmxServer.getAttribute(clientStatus, "Status"));
+
+ // restart the slave
+ jmxServer.invoke(clientStatus, "start", null, null);
+ assertEquals(true, jmxServer.getAttribute(clientStatus, "Running"));
+ assertEquals(FailoverStatusMBean.STATUS_RUNNING, jmxServer.getAttribute(clientStatus, "Status"));
+
+ } finally {
+ client.close();
+ server.close();
+ }
+
+ assertTrue(!jmxServer.isRegistered(clientStatus));
+ assertTrue(!jmxServer.isRegistered(serverStatus));
+ }
+}
Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+
+import org.apache.jackrabbit.oak.plugins.segment.DebugSegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class RecoverTest extends TestBase {
+
+ @Before
+ public void setUp() throws Exception {
+ setUpServerAndClient();
+ }
+
+ @After
+ public void after() {
+ closeServerAndClient();
+ }
+
+ @Test
+ public void testBrokenConnection() throws Exception {
+
+ NodeStore store = new SegmentNodeStore(storeS);
+ DebugSegmentStore s = new DebugSegmentStore(storeS);
+ final FailoverServer server = new FailoverServer(port, s);
+ s.createReadErrors = true;
+ server.start();
+ addTestContent(store, "server");
+
+ FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC);
+ cl.run();
+
+ try {
+ assertFalse("store are not expected to be equal", storeS.getHead().equals(storeC.getHead()));
+ s.createReadErrors = false;
+ cl.run();
+ assertEquals(storeS.getHead(), storeC.getHead());
+ } finally {
+ server.close();
+ cl.close();
+ }
+
+ }
+}
Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
+
+public class TestBase {
+ int port = Integer.valueOf(System.getProperty("failover.server.port", "52808"));
+
+ File directoryS;
+ FileStore storeS;
+
+ File directoryC;
+ FileStore storeC;
+
+ File directoryC2;
+ FileStore storeC2;
+
+ public void setUpServerAndClient() throws IOException {
+ // server
+ directoryS = createTmpTargetDir("FailoverServerTest");
+ storeS = new FileStore(directoryS, 1, false);
+
+ // client
+ directoryC = createTmpTargetDir("FailoverClientTest");
+ storeC = new FileStore(directoryC, 1, false);
+ }
+
+ public void setUpServerAndTwoClients() throws Exception {
+ setUpServerAndClient();
+
+ directoryC2 = createTmpTargetDir("FailoverClient2Test");
+ storeC2 = new FileStore(directoryC2, 1, false);
+ }
+
+ public void closeServerAndClient() {
+ storeS.close();
+ storeC.close();
+ try {
+ FileUtils.deleteDirectory(directoryS);
+ FileUtils.deleteDirectory(directoryC);
+ } catch (IOException e) {
+ }
+ }
+
+ public void closeServerAndTwoClients() {
+ closeServerAndClient();
+ storeC2.close();
+ try {
+ FileUtils.deleteDirectory(directoryC2);
+ } catch (IOException e) {
+ }
+ }
+}