You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ba...@apache.org on 2014/08/26 17:30:56 UTC

svn commit: r1620634 - in /jackrabbit/oak/trunk/oak-tarmk-failover/src: main/java/org/apache/jackrabbit/oak/plugins/segment/failover/ main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/ main/java/org/apache/jackrabbit/oak/plugins/segme...

Author: baedke
Date: Tue Aug 26 15:30:56 2014
New Revision: 1620634

URL: http://svn.apache.org/r1620634
Log:
OAK-1915: TarMK failover 2.0

Bug fixed: master startup occasionally deadlocked due to race conditions.
Features added: MBean support, compression, checksums, master access restriction by ip address, client can be started via oak-run runmode.

Added unit tests.

Added:
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/FailoverStatusMBean.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java
Modified:
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java

Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.ObservablePartnerMBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CommunicationObserver {
+
+    private class CommunicationPartnerMBean implements ObservablePartnerMBean {
+        private final ObjectName mbeanName;
+        private final String clientName;
+        public String lastRequest;
+        public Date lastSeen;
+        public String remoteAddress;
+        public int remotePort;
+
+        public CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException {
+            this.clientName = clientName;
+            this.mbeanName = new ObjectName(FailoverStatusMBean.JMX_NAME + ",id=\"Client " + clientName + "\"");
+        }
+
+        public ObjectName getMBeanName() {
+            return this.mbeanName;
+        }
+
+        @Override
+        public String getName() {
+            return this.clientName;
+        }
+
+        @Override
+        public String getRemoteAddress() {
+            return this.remoteAddress;
+        }
+
+        @Override
+        public String getLastRequest() {
+            return this.lastRequest;
+        }
+
+        @Override
+        public int getRemotePort() {
+            return this.remotePort;
+        }
+
+        @Override
+        public String getLastSeenTimestamp() {
+            return this.lastSeen == null ? null : this.lastSeen.toString();
+        }
+    }
+
+    private static final Logger log = LoggerFactory
+            .getLogger(CommunicationObserver.class);
+
+    private final String identifier;
+    private final Map<String, CommunicationPartnerMBean> partnerDetails;
+
+    public CommunicationObserver(String myID) {
+        this.identifier = myID;
+        this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>();
+    }
+
+    public void unregister() {
+        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
+            try {
+                jmxServer.unregisterMBean(m.getMBeanName());
+            }
+            catch (Exception e) {
+                log.error("error unregistering mbean for client '" + m.getName() + "'", e);
+            }
+        }
+    }
+
+    public void gotMessageFrom(String client, String request, InetSocketAddress remote) throws MalformedObjectNameException {
+        CommunicationPartnerMBean m = this.partnerDetails.get(client);
+        boolean register = false;
+        if (m == null) {
+            m = new CommunicationPartnerMBean(client);
+            m.remoteAddress = remote.getAddress().getHostAddress();
+            m.remotePort = remote.getPort();
+            register = true;
+        }
+        m.lastSeen = new Date();
+        m.lastRequest = request;
+        this.partnerDetails.put(client, m);
+        if (register) {
+            final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+            try {
+                jmxServer.registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), m.getMBeanName());
+            }
+            catch (Exception e) {
+                log.error("can register mbean for client '" + m.getName() + "'", e);
+            }
+        }
+    }
+
+    public String getID() {
+        return this.identifier;
+    }
+}

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java Tue Aug 26 15:30:56 2014
@@ -27,6 +27,7 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.compression.SnappyFramedDecoder;
 import io.netty.handler.codec.string.StringEncoder;
 import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.CharsetUtil;
@@ -34,15 +35,24 @@ import io.netty.util.concurrent.DefaultE
 import io.netty.util.concurrent.EventExecutorGroup;
 
 import java.io.Closeable;
+import java.lang.management.ManagementFactory;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.CommunicationObserver;
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdDecoder;
 import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class FailoverClient implements Runnable, Closeable {
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+public final class FailoverClient implements FailoverStatusMBean, Runnable, Closeable {
+    public static final String CLIENT_ID_PROPERTY_NAME = "failOverID";
 
     private static final Logger log = LoggerFactory
             .getLogger(FailoverClient.class);
@@ -52,20 +62,60 @@ public final class FailoverClient implem
     private int readTimeoutMs = 10000;
 
     private final FailoverStore store;
+    private final CommunicationObserver observer;
     private FailoverClientHandler handler;
     private EventLoopGroup group;
     private EventExecutorGroup executor;
+    private boolean running;
+    private String state;
 
     public FailoverClient(String host, int port, SegmentStore store) {
+        this.state = STATUS_INITIALIZING;
         this.host = host;
         this.port = port;
         this.store = new FailoverStore(store);
+        String s = System.getProperty(CLIENT_ID_PROPERTY_NAME);
+        this.observer = new CommunicationObserver((s == null || s.length() == 0) ? UUID.randomUUID().toString() : s);
+
+        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        try {
+            jmxServer.registerMBean(new StandardMBean(this, FailoverStatusMBean.class), new ObjectName(this.getMBeanName()));
+        }
+        catch (Exception e) {
+            log.error("can register failover status mbean", e);
+        }
     }
 
-    public void run() {
+    public String getMBeanName() {
+        return FailoverStatusMBean.JMX_NAME + ",id=\"" + this.observer.getID() + "\"";
+    }
+
+    public void close() {
+        stop();
+        state = STATUS_CLOSING;
+        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        try {
+            jmxServer.unregisterMBean(new ObjectName(this.getMBeanName()));
+        }
+        catch (Exception e) {
+            log.error("can unregister failover status mbean", e);
+        }
+        observer.unregister();
+        if (group != null && !group.isShuttingDown()) {
+            group.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+                    .syncUninterruptibly();
+        }
+        if (executor != null && !executor.isShuttingDown()) {
+            executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+                    .syncUninterruptibly();
+        }
+        state = STATUS_CLOSED;
+    }
 
+    public void run() {
+        state = STATUS_STARTING;
         this.executor = new DefaultEventExecutorGroup(4);
-        this.handler = new FailoverClientHandler(this.store, executor);
+        this.handler = new FailoverClientHandler(this.store, executor, this.observer);
 
         group = new NioEventLoopGroup();
         Bootstrap b = new Bootstrap();
@@ -83,33 +133,49 @@ public final class FailoverClient implem
                 // WriteTimeoutHandler & ReadTimeoutHandler
                 p.addLast("readTimeoutHandler", new ReadTimeoutHandler(
                         readTimeoutMs, TimeUnit.MILLISECONDS));
-
                 p.addLast(new StringEncoder(CharsetUtil.UTF_8));
+                p.addLast(new SnappyFramedDecoder(true));
                 p.addLast(new RecordIdDecoder(store));
                 p.addLast(executor, handler);
             }
         });
         try {
             // Start the client.
+            running = true;
+            state = STATUS_RUNNING;
             ChannelFuture f = b.connect(host, port).sync();
             // Wait until the connection is closed.
             f.channel().closeFuture().sync();
         } catch (Exception e) {
             log.error("Failed synchronizing state.", e);
-        } finally {
-            close();
+            stop();
         }
     }
 
     @Override
-    public void close() {
-        if (group != null && !group.isShuttingDown()) {
-            group.shutdownGracefully(1, 2, TimeUnit.SECONDS)
-                    .syncUninterruptibly();
-        }
-        if (executor != null && !executor.isShuttingDown()) {
-            executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
-                    .syncUninterruptibly();
+    public String getMode() {
+        return "client: " + this.observer.getID();
+    }
+
+    @Override
+    public boolean isRunning() { return running;}
+
+    @Override
+    public void start() {
+        if (!running) run();
+    }
+
+    @Override
+    public void stop() {
+        //TODO running flag doesn't make sense this way, since run() is usually scheduled to be called repeatedly.
+        if (running) {
+            running = false;
+            state = STATUS_STOPPED;
         }
     }
+
+    @Override
+    public String getStatus() {
+        return this.state;
+    }
 }

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java Tue Aug 26 15:30:56 2014
@@ -28,9 +28,11 @@ import io.netty.util.concurrent.GenericF
 import io.netty.util.concurrent.Promise;
 
 import java.io.Closeable;
+import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
+import org.apache.jackrabbit.oak.plugins.segment.failover.CommunicationObserver;
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdDecoder;
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentDecoder;
 import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
@@ -45,6 +47,7 @@ public class FailoverClientHandler exten
 
     private final FailoverStore store;
     private final EventExecutorGroup executor;
+    private final CommunicationObserver observer;
     private EventExecutorGroup preloaderExecutor;
     private EventExecutorGroup loaderExecutor;
 
@@ -53,9 +56,10 @@ public class FailoverClientHandler exten
     private Promise<RecordId> headPromise;
 
     public FailoverClientHandler(final FailoverStore store,
-            EventExecutorGroup executor) {
+            EventExecutorGroup executor, CommunicationObserver observer) {
         this.store = store;
         this.executor = executor;
+        this.observer = observer;
     }
 
     @Override
@@ -91,7 +95,7 @@ public class FailoverClientHandler exten
                 }
             }
         });
-        ctx.writeAndFlush(newGetHeadReq()).addListener(
+        ctx.writeAndFlush(newGetHeadReq(this.observer.getID())).addListener(
                 new FailedRequestListener(headPromise));
     }
 
@@ -114,7 +118,7 @@ public class FailoverClientHandler exten
 
         loaderExecutor = new DefaultEventExecutorGroup(4);
         SegmentLoaderHandler h2 = new SegmentLoaderHandler(store, head,
-                preloaderExecutor, loaderExecutor);
+                preloaderExecutor, loaderExecutor, this.observer.getID());
         ctx.pipeline().addLast(loaderExecutor, h2);
 
         h1.channelActive(ctx);

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java Tue Aug 26 15:30:56 2014
@@ -48,6 +48,7 @@ public class SegmentLoaderHandler extend
             .getLogger(SegmentLoaderHandler.class);
 
     private final FailoverStore store;
+    private final String clientID;
     private final RecordId head;
     private final EventExecutorGroup preloaderExecutor;
     private final EventExecutorGroup loaderExecutor;
@@ -60,11 +61,13 @@ public class SegmentLoaderHandler extend
 
     public SegmentLoaderHandler(final FailoverStore store, RecordId head,
             EventExecutorGroup preloaderExecutor,
-            EventExecutorGroup loaderExecutor) {
+            EventExecutorGroup loaderExecutor,
+            String clientID) {
         this.store = store;
         this.head = head;
         this.preloaderExecutor = preloaderExecutor;
         this.loaderExecutor = loaderExecutor;
+        this.clientID = clientID;
     }
 
     @Override
@@ -103,7 +106,7 @@ public class SegmentLoaderHandler extend
 
     @Override
     public Segment readSegment(final SegmentId id) {
-        ctx.writeAndFlush(newGetSegmentReq(id)).addListener(reqListener);
+        ctx.writeAndFlush(newGetSegmentReq(this.clientID, id)).addListener(reqListener);
         return getSegment();
     }
 

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java Tue Aug 26 15:30:56 2014
@@ -26,14 +26,37 @@ public class Messages {
     public static final byte HEADER_SEGMENT = 0x01;
 
     public static final String GET_HEAD = "h";
-
     public static final String GET_SEGMENT = "s.";
 
-    public static String newGetHeadReq() {
-        return GET_HEAD + "\r\n";
+    private static final String MAGIC = "FailOver-CMD@";
+    private static final String SEPARATOR = ":";
+
+    private static String newRequest(String clientID, String body) {
+        return MAGIC + (clientID == null ? "" : clientID.replace(SEPARATOR, "#")) + SEPARATOR + body + "\r\n";
+    }
+
+    public static String newGetHeadReq(String clientID) {
+        return newRequest(clientID, GET_HEAD);
+    }
+
+    public static String newGetSegmentReq(String clientID, SegmentId sid) {
+        return newRequest(clientID, GET_SEGMENT + sid.toString());
+    }
+
+    public static String extractMessageFrom(String payload) {
+        if (payload.startsWith(MAGIC) && payload.length() > MAGIC.length()) {
+            int i = payload.indexOf(SEPARATOR);
+            return payload.substring(i + 1);
+        }
+        return null;
     }
 
-    public static String newGetSegmentReq(SegmentId sid) {
-        return GET_SEGMENT + sid.toString() + "\r\n";
+    public static String extractClientFrom(String payload) {
+        if (payload.startsWith(MAGIC) && payload.length() > MAGIC.length()) {
+            payload = payload.substring(MAGIC.length());
+            int i = payload.indexOf(SEPARATOR);
+            return payload.substring(0, i);
+        }
+        return null;
     }
 }

Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/FailoverStatusMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/FailoverStatusMBean.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/FailoverStatusMBean.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/FailoverStatusMBean.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.failover.jmx;
+
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+import javax.annotation.Nonnull;
+
+public interface FailoverStatusMBean {
+    public static final String JMX_NAME = "org.apache.jackrabbit.oak:name=Status,type=\"FailOver\"";
+    public static final String STATUS_INITIALIZING = "initializing";
+    public static final String STATUS_STOPPED = "stopped";
+    public static final String STATUS_STARTING = "starting";
+    public static final String STATUS_RUNNING = "running";
+    public static final String STATUS_CLOSING = "closing";
+    public static final String STATUS_CLOSED = "closed";
+
+    @Nonnull
+    @Description("master or client")
+    String getMode();
+
+    @Description("current status of the service")
+    String getStatus();
+
+    @Description("instance is running")
+    boolean isRunning();
+
+    @Description("stop the failover communication")
+    void stop();
+
+    @Description("start the failover communication")
+    void start();
+}

Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.jmx;
+
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+public interface ObservablePartnerMBean {
+
+    @Nonnull
+    @Description("name of the partner")
+    String getName();
+
+    @Description("IP of the remote")
+    String getRemoteAddress();
+
+    @Description("Last request")
+    String getLastRequest();
+
+    @Description("Port of the remote")
+    int getRemotePort();
+
+    @CheckForNull
+    @Description("Time the remote instance was last contacted")
+    String getLastSeenTimestamp();
+}

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java Tue Aug 26 15:30:56 2014
@@ -19,6 +19,7 @@
 package org.apache.jackrabbit.oak.plugins.segment.failover.server;
 
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
@@ -27,30 +28,62 @@ import io.netty.channel.nio.NioEventLoop
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.LineBasedFrameDecoder;
+import io.netty.handler.codec.compression.SnappyFramedEncoder;
 import io.netty.handler.codec.string.StringDecoder;
 import io.netty.util.CharsetUtil;
 
 import java.io.Closeable;
+import java.lang.management.ManagementFactory;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.util.concurrent.Future;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.CommunicationObserver;
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdEncoder;
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class FailoverServer implements Closeable {
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+public class FailoverServer implements FailoverStatusMBean, Closeable {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(FailoverServer.class);
 
     private final int port;
 
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
     private final ServerBootstrap b;
+    private final CommunicationObserver observer;
+    private final FailoverServerHandler handler;
+    private ChannelFuture channelFuture;
+    private boolean running;
 
     public FailoverServer(int port, final SegmentStore store) {
+        this(port, store, null);
+    }
+
+    public FailoverServer(int port, final SegmentStore store, String[] allowedClientIPRanges) {
         this.port = port;
 
+        observer = new CommunicationObserver("master");
+        handler = new FailoverServerHandler(store, observer, allowedClientIPRanges);
         bossGroup = new NioEventLoopGroup(1);
         workerGroup = new NioEventLoopGroup();
 
+        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        try {
+            jmxServer.registerMBean(new StandardMBean(this, FailoverStatusMBean.class), new ObjectName(this.getMBeanName()));
+        }
+        catch (Exception e) {
+            log.error("can register failover status mbean", e);
+        }
+
         b = new ServerBootstrap();
         b.group(bossGroup, workerGroup);
         b.channel(NioServerSocketChannel.class);
@@ -67,32 +100,92 @@ public class FailoverServer implements C
                 ChannelPipeline p = ch.pipeline();
                 p.addLast(new LineBasedFrameDecoder(8192));
                 p.addLast(new StringDecoder(CharsetUtil.UTF_8));
+                p.addLast(new SnappyFramedEncoder());
                 p.addLast(new RecordIdEncoder());
                 p.addLast(new SegmentEncoder());
-                p.addLast(new FailoverServerHandler(store));
+                p.addLast(handler);
             }
         });
     }
 
-    public void start() {
-        try {
-            b.bind(port).sync().channel().closeFuture().sync();
-        } catch (InterruptedException e) {
-            close();
-        }
+    public String getMBeanName() {
+        return FailoverStatusMBean.JMX_NAME + ",id=" + this.port;
     }
 
-    @Override
     public void close() {
+        stop();
+        handler.state = STATUS_CLOSING;
+        observer.unregister();
+        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        try {
+            jmxServer.unregisterMBean(new ObjectName(this.getMBeanName()));
+        }
+        catch (Exception e) {
+            log.error("can unregister failover status mbean", e);
+        }
         if (bossGroup != null && !bossGroup.isShuttingDown()) {
-            bossGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS)
-                    .syncUninterruptibly();
+            bossGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly();
         }
         if (workerGroup != null && !workerGroup.isShuttingDown()) {
-            workerGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS)
-                    .syncUninterruptibly();
-            ;
+            workerGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly();
+        }
+        handler.state = STATUS_CLOSED;
+    }
+
+    @Override
+    public void start() {
+        if (running) return;
+
+        running = true;
+        this.handler.state = STATUS_STARTING;
+
+        Future<?> startup = bossGroup.submit(new Runnable() {
+            @Override
+            public void run() {
+                //netty 4.0.20 has a race condition issue with
+                //asynchronous channel registration. As a workaround
+                //we bind asynchronously from the boss event group to make
+                //the channel registration synchronous.
+                //Note that now this method will return immediately.
+                channelFuture = b.bind(port);
+                new Thread() {
+                    @Override
+                    public void run() {
+                        try {
+                            running = true;
+                            channelFuture.sync().channel().closeFuture().sync();
+                        } catch (InterruptedException e) {
+                            FailoverServer.this.stop();
+                        }
+                    }
+                }.start();
+            }
+        });
+        if (!startup.awaitUninterruptibly(10000)) {
+            log.error("FailoverServer failed to start within 10 seconds and will be canceled");
+            startup.cancel(true);
+        }
+    }
+
+    @Override
+    public String getMode() {
+        return "master";
+    }
+
+    @Override
+    public boolean isRunning() { return running; }
+
+    @Override
+    public void stop() {
+        if (running) {
+            running = false;
+            this.handler.state = STATUS_STOPPED;
+            channelFuture.channel().disconnect();
         }
     }
 
+    @Override
+    public String getStatus() {
+        return handler == null ? STATUS_INITIALIZING : handler.state;
+    }
 }

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java Tue Aug 26 15:30:56 2014
@@ -18,6 +18,9 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.failover.server;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -30,6 +33,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.CommunicationObserver;
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,9 +45,14 @@ public class FailoverServerHandler exten
             .getLogger(FailoverServerHandler.class);
 
     private final SegmentStore store;
+    private final CommunicationObserver observer;
+    private final String[] allowedIPRanges;
+    public String state;
 
-    public FailoverServerHandler(SegmentStore store) {
+    public FailoverServerHandler(SegmentStore store, CommunicationObserver observer, String[] allowedIPRanges) {
         this.store = store;
+        this.observer = observer;
+        this.allowedIPRanges = allowedIPRanges;
     }
 
     private RecordId headId() {
@@ -53,49 +62,117 @@ public class FailoverServerHandler exten
         return null;
     }
 
-    @Override
-    public void channelRead0(ChannelHandlerContext ctx, String request)
-            throws Exception {
-        if (Messages.GET_HEAD.equalsIgnoreCase(request)) {
-            RecordId r = headId();
-            if (r != null) {
-                ctx.writeAndFlush(r);
-            } else {
-                ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
+    private static long ipToLong(InetAddress ip) {
+        byte[] octets = ip.getAddress();
+        long result = 0;
+        for (byte octet : octets) {
+            result <<= 8;
+            result |= octet & 0xff;
+        }
+        return result;
+    }
+
+    private boolean clientAllowed(InetSocketAddress client) {
+        if (this.allowedIPRanges != null) {
+            for (String s : this.allowedIPRanges) {
+                try {
+                    if (ipToLong(InetAddress.getByName(s)) == ipToLong(client.getAddress())) {
+                        return true;
+                    }
+                }
+                catch (UnknownHostException ignored) { /* it's an ip range */ }
+                int i = s.indexOf('-');
+                if (i > 0) {
+                    try {
+                        long startIPRange = ipToLong(InetAddress.getByName(s.substring(0, i).trim()));
+                        long endIPRange = ipToLong(InetAddress.getByName(s.substring(i + 1).trim()));
+                        long ipl = ipToLong(client.getAddress());
+                        if (startIPRange <= ipl && ipl <= endIPRange) return true;
+                    }
+                    catch (Exception e) {
+                        log.warn("invalid IP-range format: " + s);
+                    }
+                }
             }
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
+        state = "Channel registered";
+        super.channelRegistered(ctx);
+    }
+
+    @Override
+    public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
+        state = "Channel active";
+        super.channelActive(ctx);
+    }
 
-        } else if (request.startsWith(Messages.GET_SEGMENT)) {
-            String sid = request.substring(Messages.GET_SEGMENT.length());
-            log.debug("request segment id {}", sid);
-            UUID uuid = UUID.fromString(sid);
+    @Override
+    public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
+        state = "Channel inactive";
+        super.channelInactive(ctx);
+    }
 
-            Segment s = null;
+    @Override
+    public void channelUnregistered(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
+        state = "Channel unregistered";
+        super.channelUnregistered(ctx);
+    }
 
-            for (int i = 0; i < 3; i++) {
-                try {
-                    s = store.readSegment(new SegmentId(store.getTracker(),
-                            uuid.getMostSignificantBits(), uuid
-                                    .getLeastSignificantBits()));
-                } catch (IllegalStateException e) {
-                    // segment not found
-                    log.warn(e.getMessage());
+    @Override
+    public void channelRead0(ChannelHandlerContext ctx, String payload)
+            throws Exception {
+        state = "got message";
+
+        String request = Messages.extractMessageFrom(payload);
+        InetSocketAddress client = (InetSocketAddress)ctx.channel().remoteAddress();
+        if (!clientAllowed(client)) {
+            log.warn("Got request from client " + client + " which is not in the allowed ip ranges! Request will be ignored.");
+        }
+        else {
+            observer.gotMessageFrom(Messages.extractClientFrom(payload), request, client);
+            if (Messages.GET_HEAD.equalsIgnoreCase(request)) {
+                RecordId r = headId();
+                if (r != null) {
+                    ctx.writeAndFlush(r);
+                    return;
                 }
-                if (s != null) {
-                    break;
-                } else {
-                    TimeUnit.MILLISECONDS.sleep(500);
+            } else if (request.startsWith(Messages.GET_SEGMENT)) {
+                String sid = request.substring(Messages.GET_SEGMENT.length());
+                log.debug("request segment id {}", sid);
+                UUID uuid = UUID.fromString(sid);
+
+                Segment s = null;
+
+                for (int i = 0; i < 3; i++) {
+                    try {
+                        s = store.readSegment(new SegmentId(store.getTracker(),
+                                uuid.getMostSignificantBits(), uuid
+                                .getLeastSignificantBits()));
+                    } catch (IllegalStateException e) {
+                        // segment not found
+                        log.warn(e.getMessage());
+                    }
+                    if (s != null) {
+                        break;
+                    } else {
+                        TimeUnit.MILLISECONDS.sleep(500);
+                    }
                 }
-            }
 
-            if (s != null) {
-                ctx.writeAndFlush(s);
+                if (s != null) {
+                    ctx.writeAndFlush(s);
+                    return;
+                }
             } else {
-                ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
+                log.warn("Unknown request {}, ignoring.", request);
             }
-        } else {
-            log.warn("Unknown request {}, ignoring.", request);
-            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
         }
+        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
     }
 
     @Override
@@ -105,6 +182,7 @@ public class FailoverServerHandler exten
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        state = "exception occurred: " + cause.getMessage();
         log.error(cause.getMessage(), cause);
         ctx.close();
     }

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java Tue Aug 26 15:30:56 2014
@@ -16,7 +16,6 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.failover.store;
 
-import static java.lang.Integer.parseInt;
 import static java.lang.String.valueOf;
 import static org.apache.felix.scr.annotations.ReferencePolicy.STATIC;
 import static org.apache.felix.scr.annotations.ReferencePolicyOption.GREEDY;
@@ -32,6 +31,7 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.PropertyOption;
 import org.apache.felix.scr.annotations.Reference;
+import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStoreService;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
@@ -42,36 +42,44 @@ import org.osgi.service.component.Compon
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 @Component(policy = ConfigurationPolicy.REQUIRE)
 public class FailoverStoreService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final String MODE_MASTER = "master";
+    private static final String MODE_SLAVE = "slave";
+
+    public static final String MODE_DEFAULT = MODE_MASTER;
     @Property(label = "Mode", description = "TarMK Failover mode (master or slave)", options = {
             @PropertyOption(name = "master", value = "master"),
-            @PropertyOption(name = "slave", value = "slave") }, value = "master")
+            @PropertyOption(name = "slave", value = "slave") }, value = MODE_DEFAULT)
     public static final String MODE = "mode";
 
-    @Property(label = "Port", description = "TarMK Failover port", intValue = 8023)
+    public static final int PORT_DEFAULT = 8023;
+    @Property(label = "Port", description = "TarMK Failover port", intValue = PORT_DEFAULT)
     public static final String PORT = "port";
 
-    @Property(label = "Master Host", description = "TarMK Failover master host (enabled for slave mode only)", value = "127.0.0.1")
+    public static final String MASTER_HOST_DEFAULT = "127.0.0.1";
+    @Property(label = "Master Host", description = "TarMK Failover master host (enabled for slave mode only)", value = MASTER_HOST_DEFAULT)
     public static final String MASTER_HOST = "master.host";
 
-    @Property(label = "Sync interval (seconds)", description = "TarMK Failover sync interval (seconds)", intValue = 5)
+    public static final int INTERVAL_DEFAULT = 5;
+    @Property(label = "Sync interval (seconds)", description = "TarMK Failover sync interval (seconds)", intValue = INTERVAL_DEFAULT)
     public static final String INTERVAL = "interval";
 
-    private static String MODE_MASTER = "master";
-
-    private static String MODE_SLAVE = "slave";
+    public static final String ALLOWED_CLIENT_IP_RANGES_DEFAULT = null;
+    @Property(label = "Client allowed IP-Ranges", description = "accept incoming requests for these IP-ranges only")
+    public static final String ALLOWED_CLIENT_IP_RANGES = "master.allowed-client-ip-ranges";
 
     @Reference(policy = STATIC, policyOption = GREEDY)
     private NodeStore store = null;
-
     private SegmentStore segmentStore;
 
     private FailoverServer master = null;
     private FailoverClient sync = null;
+
     private ServiceRegistration syncReg = null;
 
     @Activate
@@ -110,17 +118,19 @@ public class FailoverStoreService {
 
     private void bootstrapMaster(ComponentContext context) {
         Dictionary<?, ?> props = context.getProperties();
-        int port = parseInt(valueOf(props.get(PORT)));
-        master = new FailoverServer(port, segmentStore);
+        int port = PropertiesUtil.toInteger(props.get(PORT), PORT_DEFAULT);
+        String ipRanges = PropertiesUtil.toString(props.get(ALLOWED_CLIENT_IP_RANGES), ALLOWED_CLIENT_IP_RANGES_DEFAULT);
+        String[] ranges = ipRanges == null ? null : ipRanges.split(",");
+        master = new FailoverServer(port, segmentStore, ranges);
         master.start();
-        log.info("started failover master on port {}.", port);
+        log.info("started failover master on port {} with allowed ip ranges {}.", port, ipRanges);
     }
 
     private void bootstrapSlave(ComponentContext context) {
         Dictionary<?, ?> props = context.getProperties();
-        int port = parseInt(valueOf(props.get(PORT)));
-        long interval = parseInt(valueOf(props.get(INTERVAL)));
-        String host = valueOf(context.getProperties().get(MASTER_HOST));
+        int port = PropertiesUtil.toInteger(props.get(PORT), PORT_DEFAULT);
+        long interval = PropertiesUtil.toInteger(props.get(INTERVAL), INTERVAL_DEFAULT);
+        String host = PropertiesUtil.toString(props.get(MASTER_HOST), MASTER_HOST_DEFAULT);
 
         sync = new FailoverClient(host, port, segmentStore);
         Dictionary<Object, Object> dictionary = new Hashtable<Object, Object>();

Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+
+import javax.annotation.Nonnull;
+
+public class DebugSegmentStore implements SegmentStore {
+
+    private final SegmentStore target;
+    public boolean createReadErrors;
+
+    public DebugSegmentStore(SegmentStore targetStore) {
+        this.target = targetStore;
+    }
+
+    @Override
+    public SegmentTracker getTracker() {
+        return this.target.getTracker();
+    }
+
+    @Nonnull
+    @Override
+    public SegmentNodeState getHead() {
+        return this.target.getHead();
+    }
+
+    @Override
+    public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
+        return this.target.setHead(base, head);
+    }
+
+    @Override
+    public boolean containsSegment(SegmentId id) {
+        return this.target.containsSegment(id);
+    }
+
+    @Override
+    public Segment readSegment(SegmentId segmentId) {
+        return createReadErrors ? null : this.target.readSegment(segmentId);
+    }
+
+    @Override
+    public void writeSegment(SegmentId id, byte[] bytes, int offset, int length) {
+        this.target.writeSegment(id, bytes, offset, length);
+    }
+
+    @Override
+    public void close() {
+        this.target.close();
+    }
+
+    @Override
+    public Blob readBlob(String reference) {
+        return this.target.readBlob(reference);
+    }
+
+    @Override
+    public BlobStore getBlobStore() {
+        return this.target.getBlobStore();
+    }
+
+    @Override
+    public void gc() {
+        this.target.gc();
+    }
+}

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java Tue Aug 26 15:30:56 2014
@@ -18,9 +18,20 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import static java.io.File.createTempFile;
 import static org.apache.jackrabbit.oak.plugins.segment.Segment.MAX_SEGMENT_SIZE;
 import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ALIGN_BITS;
+import static org.junit.Assert.assertEquals;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Random;
 
 public class SegmentTestUtils {
@@ -38,4 +49,30 @@ public class SegmentTestUtils {
         RecordId r = new RecordId(id, newValidOffset(random));
         return r;
     }
+
+    public static void assertEqualStores(File d1, File d2) throws IOException {
+        FileStore f1 = new FileStore(d1, 1, false);
+        FileStore f2 = new FileStore(d2, 1, false);
+        try {
+            assertEquals(f1.getHead(), f2.getHead());
+        } finally {
+            f1.close();
+            f2.close();
+        }
+    }
+
+    public static void addTestContent(NodeStore store, String child)
+            throws CommitFailedException {
+        NodeBuilder builder = store.getRoot().builder();
+        builder.child(child).setProperty("ts", System.currentTimeMillis());
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+    }
+
+    public static File createTmpTargetDir(String name) throws IOException {
+        File f = createTempFile(name, "dir", new File("target"));
+        f.delete();
+        f.mkdir();
+        return f;
+    }
+
 }

Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class FailoverIPRangeTest extends TestBase {
+
+    @Before
+    public void setUp() throws Exception {
+        setUpServerAndClient();
+    }
+
+    @After
+    public void after() {
+        closeServerAndClient();
+    }
+
+    @Test
+    public void testFailoverAllClients() throws Exception {
+        createTestWithConfig(null, true);
+    }
+    @Test
+
+    public void testFailoverLocalClient() throws Exception {
+        createTestWithConfig(new String[]{"127.0.0.1"}, true);
+    }
+
+    @Test
+    public void testFailoverWrongClient() throws Exception {
+        createTestWithConfig(new String[]{"127.0.0.2"}, false);
+    }
+
+    @Test
+    public void testFailoverLocalhost() throws Exception {
+        createTestWithConfig(new String[]{"localhost"}, true);
+    }
+
+    @Test
+    public void testFailoverInvalidName() throws Exception {
+        createTestWithConfig(new String[]{"foobar"}, false);
+    }
+
+    @Test
+    public void testFailoverValidIPRangeStart() throws Exception {
+        createTestWithConfig(new String[]{"127.0.0.1-127.0.0.2"}, true);
+    }
+
+    @Test
+    public void testFailoverValidIPRangeEnd() throws Exception {
+        createTestWithConfig(new String[]{"127.0.0.0-127.0.0.1"}, true);
+    }
+
+    @Test
+    public void testFailoverValidIPRange() throws Exception {
+        createTestWithConfig(new String[]{"127.0.0.0-127.0.0.2"}, true);
+    }
+
+    @Test
+    public void testFailoverInvalidRange() throws Exception {
+        createTestWithConfig(new String[]{"127.0.0.2-127.0.0.1"}, false);
+    }
+
+    @Test
+    public void testFailoverCorrectList() throws Exception {
+        createTestWithConfig(new String[]{"foobar","127-128","126.0.0.1", "127.0.0.0-127.255.255.255"}, true);
+    }
+
+    @Test
+    public void testFailoverWrongList() throws Exception {
+        createTestWithConfig(new String[]{"foobar","126.0.0.1", "128.0.0.1-255.255.255.255", "128.0.0.0-127.255.255.255"}, false);
+    }
+
+    private void createTestWithConfig(String[] ipRanges, boolean expectedToWork) throws Exception {
+        NodeStore store = new SegmentNodeStore(storeS);
+        final FailoverServer server = new FailoverServer(port, storeS, ipRanges);
+        server.start();
+        addTestContent(store, "server");
+
+        FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC);
+        cl.run();
+
+        try {
+            if (expectedToWork) {
+                assertEquals(storeS.getHead(), storeC.getHead());
+            }
+            else {
+                assertFalse("stores are equal but shouldn't!", storeS.getHead().equals(storeC.getHead()));
+            }
+        } finally {
+            server.close();
+            cl.close();
+        }
+
+    }
+
+}

Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils;
+import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+public class FailoverMultipleClientsTest extends TestBase {
+
+    @Before
+    public void setUp() throws Exception {
+        setUpServerAndTwoClients();
+    }
+
+    @After
+    public void after() {
+        closeServerAndTwoClients();
+    }
+
+    @Test
+    public void testMultipleClients() throws Exception {
+        NodeStore store = new SegmentNodeStore(storeS);
+        final FailoverServer server = new FailoverServer(port, storeS);
+        server.start();
+        SegmentTestUtils.addTestContent(store, "server");
+
+        FailoverClient cl1 = new FailoverClient("127.0.0.1", port, storeC);
+        FailoverClient cl2 = new FailoverClient("127.0.0.1", port, storeC2);
+
+        try {
+            assertFalse("first client has invalid initial store!", storeS.getHead().equals(storeC.getHead()));
+            assertFalse("second client has invalid initial store!", storeS.getHead().equals(storeC2.getHead()));
+            assertEquals(storeC.getHead(), storeC2.getHead());
+
+            cl1.run();
+            cl2.run();
+
+            assertEquals(storeS.getHead(), storeC.getHead());
+            assertEquals(storeS.getHead(), storeC2.getHead());
+
+            cl1.stop();
+            SegmentTestUtils.addTestContent(store, "test");
+            cl2.run();
+
+            assertEquals(storeS.getHead(), storeC2.getHead());
+            Assert.assertFalse("first client updated in stopped state!", storeS.getHead().equals(storeC.getHead()));
+
+            cl1.start();
+            assertEquals(storeS.getHead(), storeC.getHead());
+        } finally {
+            server.close();
+            cl1.close();
+            cl2.close();
+        }
+    }
+
+}

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java?rev=1620634&r1=1620633&r2=1620634&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java Tue Aug 26 15:30:56 2014
@@ -18,65 +18,32 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.failover;
 
-import static java.io.File.createTempFile;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
 import static org.junit.Assert.assertEquals;
 
 import java.io.File;
-import java.io.IOException;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
 import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
 import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
 import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
-import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
-import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
-import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class FailoverTest {
-
-    private int port = Integer.valueOf(System.getProperty(
-            "failover.server.port", "52808"));
-
-    private File directoryS;
-
-    private FileStore storeS;
-
-    private File directoryC;
-
-    private FileStore storeC;
+public class FailoverTest extends TestBase {
 
     @Before
-    public void setUp() throws IOException {
-        File target = new File("target");
-
-        // server
-        directoryS = createTempFile("FailoverServerTest", "dir", target);
-        directoryS.delete();
-        directoryS.mkdir();
-        storeS = new FileStore(directoryS, 1, false);
-
-        // client
-        directoryC = createTempFile("FailoverClientTest", "dir", target);
-        directoryC.delete();
-        directoryC.mkdir();
-        storeC = new FileStore(directoryC, 1, false);
+    public void setUp() throws Exception {
+        setUpServerAndClient();
     }
 
     @After
     public void after() {
-        storeS.close();
-        storeC.close();
-        try {
-            FileUtils.deleteDirectory(directoryS);
-            FileUtils.deleteDirectory(directoryC);
-        } catch (IOException e) {
-        }
+        closeServerAndClient();
     }
 
     @Test
@@ -84,13 +51,8 @@ public class FailoverTest {
 
         NodeStore store = new SegmentNodeStore(storeS);
         final FailoverServer server = new FailoverServer(port, storeS);
-        Thread s = new Thread() {
-            public void run() {
-                server.start();
-            }
-        };
-        s.start();
-        addTestContent(store);
+        server.start();
+        addTestContent(store, "server");
 
         FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC);
         cl.run();
@@ -104,26 +66,8 @@ public class FailoverTest {
 
     }
 
-    private static void addTestContent(NodeStore store)
-            throws CommitFailedException {
-        NodeBuilder builder = store.getRoot().builder();
-        builder.child("server").setProperty("ts", System.currentTimeMillis());
-        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-    }
-
-    static void assertEqualStores(File d1, File d2) throws IOException {
-        FileStore f1 = new FileStore(d1, 1, false);
-        FileStore f2 = new FileStore(d2, 1, false);
-        try {
-            assertEquals(f1.getHead(), f2.getHead());
-        } finally {
-            f1.close();
-            f2.close();
-        }
-    }
-
     public static void main(String[] args) throws Exception {
-        File d = createTempFile("FailoverLiveTest", "dir", new File("target"));
+        File d = createTmpTargetDir("FailoverLiveTest");
         d.delete();
         d.mkdir();
         FileStore s = new FileStore(d, 256, false);

Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;
+import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Set;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+public class MBeanTest extends TestBase {
+
+    @Before
+    public void setUp() throws Exception {
+        setUpServerAndClient();
+    }
+
+    @After
+    public void after() {
+        closeServerAndClient();
+    }
+
+    @Test
+    public void testServerEmptyConfig() throws Exception {
+        final FailoverServer server = new FailoverServer(this.port, this.storeS);
+        server.start();
+
+        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        ObjectName status = new ObjectName(FailoverStatusMBean.JMX_NAME + ",id=*");
+        try {
+            Set<ObjectName> instances = jmxServer.queryNames(status, null);
+            assertEquals(1, instances.size());
+            status = instances.toArray(new ObjectName[0])[0];
+            assertEquals(new ObjectName(server.getMBeanName()), status);
+            assertTrue(jmxServer.isRegistered(status));
+
+            assertEquals("master", jmxServer.getAttribute(status, "Mode"));
+            String m = jmxServer.getAttribute(status, "Status").toString();
+            if (!m.equals(FailoverStatusMBean.STATUS_STARTING) && !m.equals("Channel unregistered"))
+                fail("unexpected Status" + m);
+
+            assertEquals(FailoverStatusMBean.STATUS_STARTING, jmxServer.getAttribute(status, "Status"));
+            assertEquals(true, jmxServer.getAttribute(status, "Running"));
+            jmxServer.invoke(status, "stop", null, null);
+            assertEquals(false, jmxServer.getAttribute(status, "Running"));
+            assertEquals(FailoverStatusMBean.STATUS_STOPPED, jmxServer.getAttribute(status, "Status"));
+            jmxServer.invoke(status, "start", null, null);
+
+            assertEquals(true, jmxServer.getAttribute(status, "Running"));
+            assertEquals(FailoverStatusMBean.STATUS_STARTING, jmxServer.getAttribute(status, "Status"));
+        } finally {
+            server.close();
+        }
+
+        assertTrue(!jmxServer.isRegistered(status));
+    }
+
+    @Test
+    public void testClientEmptyConfigNoServer() throws Exception {
+        final FailoverClient client = new FailoverClient("127.0.0.1", this.port, this.storeC);
+        client.start();
+
+        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        ObjectName status = new ObjectName(FailoverStatusMBean.JMX_NAME + ",id=*");
+        try {
+            Set<ObjectName> instances = jmxServer.queryNames(status, null);
+            assertEquals(1, instances.size());
+            status = instances.toArray(new ObjectName[0])[0];
+            assertEquals(new ObjectName(client.getMBeanName()), status);
+            assertTrue(jmxServer.isRegistered(status));
+
+            String m = jmxServer.getAttribute(status, "Mode").toString();
+            if (!m.startsWith("client: ")) fail("unexpected mode " + m);
+
+            assertEquals(FailoverStatusMBean.STATUS_STOPPED, jmxServer.getAttribute(status, "Status"));
+
+            assertEquals(false, jmxServer.getAttribute(status, "Running"));
+            jmxServer.invoke(status, "stop", null, null);
+            assertEquals(false, jmxServer.getAttribute(status, "Running"));
+            assertEquals(FailoverStatusMBean.STATUS_STOPPED, jmxServer.getAttribute(status, "Status"));
+            jmxServer.invoke(status, "start", null, null);
+            assertEquals(false, jmxServer.getAttribute(status, "Running"));
+            assertEquals(FailoverStatusMBean.STATUS_STOPPED, jmxServer.getAttribute(status, "Status"));
+        } finally {
+            client.close();
+        }
+
+        assertTrue(!jmxServer.isRegistered(status));
+    }
+
+    @Test
+    public void testClientNoServer() throws Exception {
+        System.setProperty(FailoverClient.CLIENT_ID_PROPERTY_NAME, "Foo");
+        final FailoverClient client = new FailoverClient("127.0.0.1", this.port, this.storeC);
+        client.start();
+
+        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        ObjectName status = new ObjectName(client.getMBeanName());
+        try {
+            assertTrue(jmxServer.isRegistered(status));
+            assertEquals("client: Foo", jmxServer.getAttribute(status, "Mode"));
+        } finally {
+            client.close();
+        }
+
+        assertTrue(!jmxServer.isRegistered(status));
+    }
+
+    @Test
+    public void testClientAndServerEmptyConfig() throws Exception {
+        final FailoverServer server = new FailoverServer(this.port, this.storeS);
+        server.start();
+
+        System.setProperty(FailoverClient.CLIENT_ID_PROPERTY_NAME, "Bar");
+        final FailoverClient client = new FailoverClient("127.0.0.1", this.port, this.storeC);
+        client.start();
+
+        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        ObjectName status = new ObjectName(FailoverStatusMBean.JMX_NAME + ",id=*");
+        ObjectName clientStatus = new ObjectName(client.getMBeanName());
+        ObjectName serverStatus = new ObjectName(server.getMBeanName());
+        try {
+            Set<ObjectName> instances = jmxServer.queryNames(status, null);
+            assertEquals(3, instances.size());
+
+            assertTrue(jmxServer.isRegistered(clientStatus));
+            assertTrue(jmxServer.isRegistered(serverStatus));
+
+            String m = jmxServer.getAttribute(clientStatus, "Mode").toString();
+            if (!m.startsWith("client: ")) fail("unexpected mode " + m);
+
+            assertEquals("master", jmxServer.getAttribute(serverStatus, "Mode"));
+
+            assertEquals(true, jmxServer.getAttribute(serverStatus, "Running"));
+            assertEquals(true, jmxServer.getAttribute(clientStatus, "Running"));
+
+            // stop the master
+            jmxServer.invoke(serverStatus, "stop", null, null);
+            assertEquals(false, jmxServer.getAttribute(serverStatus, "Running"));
+            m = jmxServer.getAttribute(serverStatus, "Status").toString();
+            if (!m.equals(FailoverStatusMBean.STATUS_STOPPED) && !m.equals("Channel unregistered"))
+                fail("unexpected Status" + m);
+
+            // restart the master
+            jmxServer.invoke(serverStatus, "start", null, null);
+            assertEquals(true, jmxServer.getAttribute(serverStatus, "Running"));
+            assertEquals(true, jmxServer.getAttribute(clientStatus, "Running"));
+            m = jmxServer.getAttribute(serverStatus, "Status").toString();
+            if (!m.equals(FailoverStatusMBean.STATUS_STARTING) && !m.equals("Channel unregistered"))
+                fail("unexpected Status" + m);
+
+            // stop the slave
+            jmxServer.invoke(clientStatus, "stop", null, null);
+            assertEquals(true, jmxServer.getAttribute(serverStatus, "Running"));
+            assertEquals(false, jmxServer.getAttribute(clientStatus, "Running"));
+            assertEquals(FailoverStatusMBean.STATUS_STOPPED, jmxServer.getAttribute(clientStatus, "Status"));
+
+            // restart the slave
+            jmxServer.invoke(clientStatus, "start", null, null);
+            assertEquals(true, jmxServer.getAttribute(clientStatus, "Running"));
+            assertEquals(FailoverStatusMBean.STATUS_RUNNING, jmxServer.getAttribute(clientStatus, "Status"));
+
+        } finally {
+            client.close();
+            server.close();
+        }
+
+        assertTrue(!jmxServer.isRegistered(clientStatus));
+        assertTrue(!jmxServer.isRegistered(serverStatus));
+    }
+}

Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+
+import org.apache.jackrabbit.oak.plugins.segment.DebugSegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class RecoverTest extends TestBase {
+
+    @Before
+    public void setUp() throws Exception {
+        setUpServerAndClient();
+    }
+
+    @After
+    public void after() {
+        closeServerAndClient();
+    }
+
+    @Test
+    public void testBrokenConnection() throws Exception {
+
+        NodeStore store = new SegmentNodeStore(storeS);
+        DebugSegmentStore s = new DebugSegmentStore(storeS);
+        final FailoverServer server = new FailoverServer(port, s);
+        s.createReadErrors = true;
+        server.start();
+        addTestContent(store, "server");
+
+        FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC);
+        cl.run();
+
+        try {
+            assertFalse("store are not expected to be equal", storeS.getHead().equals(storeC.getHead()));
+            s.createReadErrors = false;
+            cl.run();
+            assertEquals(storeS.getHead(), storeC.getHead());
+        } finally {
+            server.close();
+            cl.close();
+        }
+
+    }
+}

Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java?rev=1620634&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java Tue Aug 26 15:30:56 2014
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
+
+public class TestBase {
+    int port = Integer.valueOf(System.getProperty("failover.server.port", "52808"));
+
+    File directoryS;
+    FileStore storeS;
+
+    File directoryC;
+    FileStore storeC;
+
+    File directoryC2;
+    FileStore storeC2;
+
+    public void setUpServerAndClient() throws IOException {
+        // server
+        directoryS = createTmpTargetDir("FailoverServerTest");
+        storeS = new FileStore(directoryS, 1, false);
+
+        // client
+        directoryC = createTmpTargetDir("FailoverClientTest");
+        storeC = new FileStore(directoryC, 1, false);
+    }
+
+    public void setUpServerAndTwoClients() throws Exception {
+        setUpServerAndClient();
+
+        directoryC2 = createTmpTargetDir("FailoverClient2Test");
+        storeC2 = new FileStore(directoryC2, 1, false);
+    }
+
+    public void closeServerAndClient() {
+        storeS.close();
+        storeC.close();
+        try {
+            FileUtils.deleteDirectory(directoryS);
+            FileUtils.deleteDirectory(directoryC);
+        } catch (IOException e) {
+        }
+    }
+
+    public void closeServerAndTwoClients() {
+        closeServerAndClient();
+        storeC2.close();
+        try {
+            FileUtils.deleteDirectory(directoryC2);
+        } catch (IOException e) {
+        }
+    }
+}