You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2014/10/17 14:36:14 UTC

svn commit: r1632554 [1/2] - in /jackrabbit/oak/branches/1.0: ./ oak-core/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ oak-doc/ oak-doc/src/site/ oak-doc/src/site/markdown/coldstandby/ oak-run/ oak-run/src/main/java/org/apache/jac...

Author: alexparvulescu
Date: Fri Oct 17 12:36:13 2014
New Revision: 1632554

URL: http://svn.apache.org/r1632554
Log:
OAK-1915 TarMK Cold Standby
 - merged to 1.0 branch 


Added:
    jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java
      - copied unchanged from r1622479, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java
    jackrabbit/oak/branches/1.0/oak-doc/src/site/markdown/coldstandby/
      - copied from r1626168, jackrabbit/oak/trunk/oak-doc/src/site/markdown/coldstandby/
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/   (props changed)
      - copied from r1606087, jackrabbit/oak/trunk/oak-tarmk-failover/
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
      - copied, changed from r1606708, jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
      - copied, changed from r1606708, jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java
      - copied, changed from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/
      - copied from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ClientFailoverStatusMBean.java
      - copied unchanged from r1626168, jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ClientFailoverStatusMBean.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/resources/
      - copied from r1625963, jackrabbit/oak/trunk/oak-tarmk-failover/src/main/resources/
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java
      - copied unchanged from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java
      - copied, changed from r1622479, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java
      - copied, changed from r1622479, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
      - copied, changed from r1622479, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java
      - copied, changed from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java
      - copied, changed from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java
      - copied unchanged from r1622479, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
      - copied, changed from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java
      - copied, changed from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java
      - copied, changed from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java
Removed:
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.config
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.config
Modified:
    jackrabbit/oak/branches/1.0/   (props changed)
    jackrabbit/oak/branches/1.0/oak-core/pom.xml
    jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
    jackrabbit/oak/branches/1.0/oak-doc/   (props changed)
    jackrabbit/oak/branches/1.0/oak-doc/src/site/site.xml
    jackrabbit/oak/branches/1.0/oak-run/README.md
    jackrabbit/oak/branches/1.0/oak-run/pom.xml
    jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
    jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/README.md
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/pom.xml
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/RemoteSegmentLoader.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java
    jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java
    jackrabbit/oak/branches/1.0/pom.xml

Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
  Merged /jackrabbit/oak/trunk:r1601878,1606087,1606708,1606711,1607031-1607032,1607331,1607366,1607392,1609165,1614593,1620634,1622479,1623364,1623969,1624551,1624973,1624994,1625025,1625036,1625916,1625963,1626021,1626053,1626163,1626168,1626175,1626265

Modified: jackrabbit/oak/branches/1.0/oak-core/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/pom.xml?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/pom.xml (original)
+++ jackrabbit/oak/branches/1.0/oak-core/pom.xml Fri Oct 17 12:36:13 2014
@@ -75,6 +75,7 @@
               org.apache.jackrabbit.oak.plugins.observation.filter,
               org.apache.jackrabbit.oak.plugins.segment,
               org.apache.jackrabbit.oak.plugins.segment.http,
+              org.apache.jackrabbit.oak.plugins.segment.file,
               org.apache.jackrabbit.oak.plugins.value,
               org.apache.jackrabbit.oak.plugins.version,
               org.apache.jackrabbit.oak.spi.commit,

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Fri Oct 17 12:36:13 2014
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
 
 @Component(policy = ConfigurationPolicy.REQUIRE)
 public class SegmentNodeStoreService extends ProxyNodeStore
-        implements Observable {
+        implements Observable, SegmentStoreProvider {
 
     @Property(description="The unique name of this instance")
     public static final String NAME = "name";
@@ -80,6 +80,8 @@ public class SegmentNodeStoreService ext
     @Property(description = "TarMK compaction paused flag", boolValue = true)
     public static final String PAUSE_COMPACTION = "pauseCompaction";
 
+    @Property(description = "Flag indicating that this component will not register as a NodeStore but just as a NodeStoreProvider", boolValue = false)
+    public static final String STANDBY = "standby";
     /**
      * Boolean value indicating a blobStore is to be used
      */
@@ -101,7 +103,8 @@ public class SegmentNodeStoreService ext
             policy = ReferencePolicy.DYNAMIC)
     private volatile BlobStore blobStore;
 
-    private ServiceRegistration registration;
+    private ServiceRegistration storeRegistration;
+    private ServiceRegistration providerRegistration;
     private Registration revisionGCRegistration;
     private Registration blobGCRegistration;
     private WhiteboardExecutor executor;
@@ -165,7 +168,12 @@ public class SegmentNodeStoreService ext
 
         Dictionary<String, String> props = new Hashtable<String, String>();
         props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
-        registration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props);
+
+        boolean standby = toBoolean(lookup(context, STANDBY), false);
+        providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, props);
+        if (!standby) {
+            storeRegistration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props);
+        }
 
         OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
         executor = new WhiteboardExecutor();
@@ -232,9 +240,13 @@ public class SegmentNodeStoreService ext
     }
 
     private void unregisterNodeStore() {
-        if(registration != null){
-            registration.unregister();
-            registration = null;
+        if(providerRegistration != null){
+            providerRegistration.unregister();
+            providerRegistration = null;
+        }
+        if(storeRegistration != null){
+            storeRegistration.unregister();
+            storeRegistration = null;
         }
         if (revisionGCRegistration != null) {
             revisionGCRegistration.unregister();

Propchange: jackrabbit/oak/branches/1.0/oak-doc/
------------------------------------------------------------------------------
  Merged /jackrabbit/oak/trunk/oak-doc:r1626168,1626265

Modified: jackrabbit/oak/branches/1.0/oak-doc/src/site/site.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-doc/src/site/site.xml?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-doc/src/site/site.xml (original)
+++ jackrabbit/oak/branches/1.0/oak-doc/src/site/site.xml Fri Oct 17 12:36:13 2014
@@ -46,6 +46,8 @@ under the License.
       <item href="known_issues.html" name="Known Issues" />
       <item href="dos_and_donts.html" name="Dos and don'ts" />
       <item href="when_things_go_wrong.html" name="When things go wrong" />
+      <item href="coldstandby/coldstandby.html" name="Cold Standby" />
+      <item href="FAQ.html" name="FAQ" />
     </menu>
     <menu name="Developing Oak">
       <item href="dev_getting_started.html" name="Getting Started" />

Modified: jackrabbit/oak/branches/1.0/oak-run/README.md
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/README.md?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/README.md (original)
+++ jackrabbit/oak/branches/1.0/oak-run/README.md Fri Oct 17 12:36:13 2014
@@ -9,17 +9,24 @@ The following runmodes are currently ava
     * debug     : Print status information about an Oak repository.
     * upgrade   : Upgrade from Jackrabbit 2.x repository to Oak.
     * server    : Run the Oak Server
+    * syncmaster  : Run a TarMK Cold Standby master
+    * syncslave   : Run a TarMK Cold Standby slave
 
 See the subsections below for more details on how to use these modes.
 
 Backup
 ------
 
-The 'backup' mode creates a backup from an existing oak repository. To start this
-mode, use:
+The 'backup' mode creates a backup from an existing oak repository. To start this mode, use:
 
     $ java -jar oak-run-*.jar backup /path/to/repository /path/to/backup
 
+Restore
+-------
+
+The 'restore' mode imports a backup of an existing oak repository. To start this mode, use:
+
+    $ java -jar oak-run-*.jar restore /path/to/repository /path/to/backup
 
 Debug
 -----
@@ -30,13 +37,40 @@ store. Currently this is only supported 
     $ java -jar oak-run-*.jar debug /path/to/oak/repository [id...]
 
 
+Syncmaster
+-------
+
+The 'syncmaster' mode starts a TarMK Cold Standby master listening on a TCP/IP port for connecting slaves.
+
+    $ java -jar oak-run-*.jar syncmaster [options] /path/to/TarMK
+    
+The following options are available:
+
+    --port 8023            - port to listen at
+    --admissible 127.0.0.1 - admissible client IP range or host name
+    --secure               - use secure connections
+
+Syncslave
+-------
+
+The 'syncslave' mode starts a TarMK Cold Standby slave to  create or update a continous backup from a Cold Standby master.
+
+    $ java -jar oak-run-*.jar syncslave [options] /path/to/TarMK
+
+The following options are available:
+
+    --port 8023            - port to connect to
+    --host 127.0.0.1       - host to connect to
+    --secure               - use secure connections
+    --interval 5           - schedule the slave to run continously, connecting every n seconds
+
 Compact
 -------
 
 The 'compact' mode runs the segment compaction operation on the provided TarMK
 repository. To start this mode, use:
 
-    $ java -jar oak-run-*.jar compact /path/to/oak/repository
+    $ java -jar oak-run-*.jar compact /path/to/TarMK
 
 Checkpoints
 -----------

Modified: jackrabbit/oak/branches/1.0/oak-run/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/pom.xml?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/pom.xml (original)
+++ jackrabbit/oak/branches/1.0/oak-run/pom.xml Fri Oct 17 12:36:13 2014
@@ -140,6 +140,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.jackrabbit</groupId>
+      <artifactId>oak-tarmk-failover</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.h2database</groupId>
       <artifactId>h2</artifactId>
       <version>1.3.175</version>
@@ -176,7 +181,7 @@
     <dependency>
       <groupId>net.sf.jopt-simple</groupId>
       <artifactId>jopt-simple</artifactId>
-      <version>4.4</version>
+      <version>4.6</version>
     </dependency>
     <dependency>
       <groupId>ch.qos.logback</groupId>

Modified: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java (original)
+++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java Fri Oct 17 12:36:13 2014
@@ -23,6 +23,7 @@ import java.util.Set;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
@@ -76,7 +77,7 @@ public class BenchmarkRunner {
                 .withOptionalArg().ofType(Boolean.class).defaultsTo(Boolean.FALSE);
         OptionSpec<Integer> numberOfUsers = parser.accepts("numberOfUsers")
                 .withOptionalArg().ofType(Integer.class).defaultsTo(10000);
-
+        OptionSpec<String> nonOption = parser.nonOptions();
         OptionSet options = parser.parse(args);
         int cacheSize = cache.value(options);
         RepositoryFixture[] allFixtures = new RepositoryFixture[] {
@@ -204,7 +205,7 @@ public class BenchmarkRunner {
                     flatStructure.value(options))
         };
 
-        Set<String> argset = Sets.newHashSet(options.nonOptionArguments());
+        Set<String> argset = Sets.newHashSet(nonOption.values(options));
         List<RepositoryFixture> fixtures = Lists.newArrayList();
         for (RepositoryFixture fixture : allFixtures) {
             if (argset.remove(fixture.toString())) {

Modified: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java (original)
+++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java Fri Oct 17 12:36:13 2014
@@ -16,6 +16,9 @@
  */
 package org.apache.jackrabbit.oak.run;
 
+import static com.google.common.collect.Sets.newHashSet;
+import static java.util.Arrays.asList;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -29,16 +32,12 @@ import java.util.Properties;
 import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import javax.jcr.Repository;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-
 import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
@@ -46,6 +45,7 @@ import joptsimple.OptionSpec;
 import org.apache.jackrabbit.core.RepositoryContext;
 import org.apache.jackrabbit.core.config.RepositoryConfig;
 import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
@@ -55,6 +55,8 @@ import org.apache.jackrabbit.oak.fixture
 import org.apache.jackrabbit.oak.http.OakServlet;
 import org.apache.jackrabbit.oak.jcr.Jcr;
 import org.apache.jackrabbit.oak.plugins.backup.FileStoreBackup;
+import org.apache.jackrabbit.oak.plugins.backup.FileStoreRestore;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
 import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
@@ -62,6 +64,8 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
 import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -76,7 +80,11 @@ import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 
-import static com.google.common.collect.Sets.newHashSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
 
 public class Main {
 
@@ -104,6 +112,9 @@ public class Main {
             case BACKUP:
                 backup(args);
                 break;
+            case RESTORE:
+                restore(args);
+                break;
             case BENCHMARK:
                 BenchmarkRunner.main(args);
                 break;
@@ -119,6 +130,12 @@ public class Main {
             case UPGRADE:
                 upgrade(args);
                 break;
+            case SYNCSLAVE:
+                syncSlave(args);
+                break;
+            case SYNCMASTER:
+                syncMaster(args);
+                break;
             case CHECKPOINTS:
                 checkpoints(args);
                 break;
@@ -169,6 +186,222 @@ public class Main {
         }
     }
 
+    private static void restore(String[] args) throws IOException {
+        if (args.length == 2) {
+            // TODO: enable restore for other node store implementations
+            FileStore store = new FileStore(new File(args[0]), 256, false);
+            File target = new File(args[1]);
+            try {
+                FileStoreRestore.restore(target, new SegmentNodeStore(store));
+            } catch (CommitFailedException e) {
+                throw new IOException(e);
+            }
+            store.close();
+        } else {
+            System.err.println("usage: restore <repository> <backup>");
+            System.exit(1);
+        }
+    }
+
+    //TODO react to state changes of FailoverClient (triggered via JMX), once the state model of FailoverClient is complete.
+    private static class ScheduledSyncService extends AbstractScheduledService {
+
+        private final FailoverClient failoverClient;
+        private final int interval;
+
+        public ScheduledSyncService(FailoverClient failoverClient, int interval) {
+            this.failoverClient = failoverClient;
+            this.interval = interval;
+        }
+
+        @Override
+        public void runOneIteration() throws Exception {
+            failoverClient.run();
+        }
+
+        @Override
+        protected Scheduler scheduler() {
+            return Scheduler.newFixedDelaySchedule(0, interval, TimeUnit.SECONDS);
+        }
+    }
+
+
+    private static void syncSlave(String[] args) throws Exception {
+
+        final String defaultHost = "127.0.0.1";
+        final int defaultPort = 8023;
+
+        final OptionParser parser = new OptionParser();
+        final OptionSpec<String> host = parser.accepts("host", "master host").withRequiredArg().ofType(String.class).defaultsTo(defaultHost);
+        final OptionSpec<Integer> port = parser.accepts("port", "master port").withRequiredArg().ofType(Integer.class).defaultsTo(defaultPort);
+        final OptionSpec<Integer> interval = parser.accepts("interval", "interval between successive executions").withRequiredArg().ofType(Integer.class);
+        final OptionSpec<Boolean> secure = parser.accepts("secure", "use secure connections").withRequiredArg().ofType(Boolean.class);
+        final OptionSpec<?> help = parser.acceptsAll(asList("h", "?", "help"), "show help").forHelp();
+        final OptionSpec<String> nonOption = parser.nonOptions(Mode.SYNCSLAVE + " <path to repository>");
+
+        final OptionSet options = parser.parse(args);
+        final List<String> nonOptions = nonOption.values(options);
+
+        if (options.has(help)) {
+            parser.printHelpOn(System.out);
+            System.exit(0);
+        }
+
+        if (nonOptions.isEmpty()) {
+            parser.printHelpOn(System.err);
+            System.exit(1);
+        }
+
+        FileStore store = null;
+        FailoverClient failoverClient = null;
+        try {
+            store = new FileStore(new File(nonOptions.get(0)), 256);
+            failoverClient = new FailoverClient(
+                    options.has(host)? options.valueOf(host) : defaultHost,
+                    options.has(port)? options.valueOf(port) : defaultPort,
+                    store,
+                    options.has(secure) && options.valueOf(secure));
+            if (!options.has(interval)) {
+                failoverClient.run();
+            } else {
+                ScheduledSyncService syncService = new ScheduledSyncService(failoverClient, options.valueOf(interval));
+                syncService.startAsync();
+                syncService.awaitTerminated();
+            }
+        } finally {
+            if (store != null) {
+                store.close();
+            }
+            if (failoverClient != null) {
+                failoverClient.close();
+            }
+        }
+    }
+
+    private static void syncMaster(String[] args) throws Exception {
+
+        final int defaultPort = 8023;
+
+        final OptionParser parser = new OptionParser();
+        final OptionSpec<Integer> port = parser.accepts("port", "port to listen").withRequiredArg().ofType(Integer.class).defaultsTo(defaultPort);
+        final OptionSpec<Boolean> secure = parser.accepts("secure", "use secure connections").withRequiredArg().ofType(Boolean.class);
+        final OptionSpec<String> admissible = parser.accepts("admissible", "list of admissible slave host names or ip ranges").withRequiredArg().ofType(String.class);
+        final OptionSpec<?> help = parser.acceptsAll(asList("h", "?", "help"), "show help").forHelp();
+        final OptionSpec<String> nonOption = parser.nonOptions(Mode.SYNCMASTER + " <path to repository>");
+
+        final OptionSet options = parser.parse(args);
+        final List<String> nonOptions = nonOption.values(options);
+
+        if (options.has(help)) {
+            parser.printHelpOn(System.out);
+            System.exit(0);
+        }
+
+        if (nonOptions.isEmpty()) {
+            parser.printHelpOn(System.err);
+            System.exit(1);
+        }
+
+
+        List<String> admissibleSlaves = options.has(admissible) ? options.valuesOf(admissible) : Collections.EMPTY_LIST;
+
+        FileStore store = null;
+        FailoverServer failoverServer = null;
+        try {
+            store = new FileStore(new File(nonOptions.get(0)), 256);
+            failoverServer = new FailoverServer(
+                    options.has(port)? options.valueOf(port) : defaultPort,
+                    store,
+                    admissibleSlaves.toArray(new String[0]),
+                    options.has(secure) && options.valueOf(secure));
+            failoverServer.startAndWait();
+        } finally {
+            if (store != null) {
+                store.close();
+            }
+            if (failoverServer != null) {
+                failoverServer.close();
+            }
+        }
+    }
+
+    public static NodeStore bootstrapNodeStore(String[] args, Closer closer,
+            String h) throws IOException {
+        //TODO add support for other NodeStore flags
+        OptionParser parser = new OptionParser();
+        OptionSpec<Integer> clusterId = parser
+                .accepts("clusterId", "MongoMK clusterId").withRequiredArg()
+                .ofType(Integer.class).defaultsTo(0);
+        OptionSpec<?> help = parser.acceptsAll(asList("h", "?", "help"),
+                "show help").forHelp();
+        OptionSpec<String> nonOption = parser
+                .nonOptions(h);
+
+        OptionSet options = parser.parse(args);
+        List<String> nonOptions = nonOption.values(options);
+
+        if (options.has(help)) {
+            parser.printHelpOn(System.out);
+            System.exit(0);
+        }
+
+        if (nonOptions.isEmpty()) {
+            parser.printHelpOn(System.err);
+            System.exit(1);
+        }
+
+        String src = nonOptions.get(0);
+        if (src.startsWith(MongoURI.MONGODB_PREFIX)) {
+            MongoClientURI uri = new MongoClientURI(src);
+            if (uri.getDatabase() == null) {
+                System.err.println("Database missing in MongoDB URI: "
+                        + uri.getURI());
+                System.exit(1);
+            }
+            MongoConnection mongo = new MongoConnection(uri.getURI());
+            closer.register(asCloseable(mongo));
+            DocumentNodeStore store = new DocumentMK.Builder()
+                    .setMongoDB(mongo.getDB())
+                    .setClusterId(clusterId.value(options)).getNodeStore();
+            closer.register(asCloseable(store));
+            return store;
+        } else {
+            FileStore fs = new FileStore(new File(src), 256, false);
+            closer.register(asCloseable(fs));
+            return new SegmentNodeStore(fs);
+        }
+    }
+
+    private static Closeable asCloseable(final FileStore fs) {
+        return new Closeable() {
+
+            @Override
+            public void close() throws IOException {
+                fs.close();
+            }
+        };
+    }
+
+    private static Closeable asCloseable(final DocumentNodeStore dns) {
+        return new Closeable() {
+
+            @Override
+            public void close() throws IOException {
+                dns.dispose();
+            }
+        };
+    }
+
+    private static Closeable asCloseable(final MongoConnection con) {
+        return new Closeable() {
+
+            @Override
+            public void close() throws IOException {
+                con.close();
+            }
+        };
+    }
+
     private static void compact(String[] args) throws IOException {
         if (args.length != 1) {
             System.err.println("usage: compact <path>");
@@ -456,10 +689,10 @@ public class Main {
     private static void upgrade(String[] args) throws Exception {
         OptionParser parser = new OptionParser();
         parser.accepts("datastore", "keep data store");
-
+        OptionSpec<String> nonOption = parser.nonOptions();
         OptionSet options = parser.parse(args);
 
-        List<String> argList = options.nonOptionArguments();
+        List<String> argList = nonOption.values(options);
         if (argList.size() == 2 || argList.size() == 3) {
             File dir = new File(argList.get(0));
             File xml = new File(dir, "repository.xml");
@@ -527,12 +760,12 @@ public class Main {
         OptionSpec<Integer> port = parser.accepts("port", "MongoDB port").withRequiredArg().ofType(Integer.class).defaultsTo(27017);
         OptionSpec<String> dbName = parser.accepts("db", "MongoDB database").withRequiredArg();
         OptionSpec<Integer> clusterIds = parser.accepts("clusterIds", "Cluster Ids").withOptionalArg().ofType(Integer.class).withValuesSeparatedBy(',');
-
+        OptionSpec<String> nonOption = parser.nonOptions();
         OptionSet options = parser.parse(args);
 
         OakFixture oakFixture;
 
-        List<String> arglist = options.nonOptionArguments();
+        List<String> arglist = nonOption.values(options);
         String uri = (arglist.isEmpty()) ? defaultUri : arglist.get(0);
         String fix = (arglist.size() <= 1) ? OakFixture.OAK_MEMORY : arglist.get(1);
 
@@ -672,11 +905,14 @@ public class Main {
     public enum Mode {
 
         BACKUP("backup"),
+        RESTORE("restore"),
         BENCHMARK("benchmark"),
         DEBUG("debug"),
         COMPACT("compact"),
         SERVER("server"),
         UPGRADE("upgrade"),
+        SYNCSLAVE("syncSlave"),
+        SYNCMASTER("syncmaster"),
         CHECKPOINTS("checkpoints");
 
         private final String name;

Propchange: jackrabbit/oak/branches/1.0/oak-tarmk-failover/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Fri Oct 17 12:36:13 2014
@@ -0,0 +1,5 @@
+target
+.*
+*.iml
+*.ipr
+*.iws

Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/README.md
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/README.md?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/README.md (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/README.md Fri Oct 17 12:36:13 2014
@@ -7,8 +7,8 @@ Failover
 The component should be installed when failover support is needed.
 
 The setup is expected to be: one master to one/many slaves nodes.
-The slave will periodically poll the master for the head state over http
-on a custom port, if it changed, it should pull in all the new segments.
+The slave will periodically poll the master for the head state, if this
+changed, it will pull in all the new segments since the last sync.
 
 Setup in OSGi
 -------------
@@ -24,7 +24,7 @@ Master host represents the master host i
 Interval represents how often the sync thread should run, in seconds.
 
 See examples in the osgi-conf folder for each run mode. To install a new OSGI config in the sling launcher,
-you only need to create a new folder called 'install' in the sling.home folder and copy the configs there.
+you only need to create a new folder called 'install' in the sling.home folder and copy the specific config there.
 
 TODO
 ----
@@ -32,7 +32,6 @@ TODO
   - timeout handling doesn't cover everything on both server and slave
   - error handling on the slave still has some issues (the slave hangs)
   - maybe enable compression of the segments over the wire
-  - maybe add a checksum to the segment encoder/decoder to verify the integrity of the transfer
   - slave runmode could possibly be a read-only mode (no writes permitted)
 
 License

Copied: jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg (from r1606708, jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg?p2=jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg&p1=jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg&r1=1606708&r2=1632554&rev=1632554&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg Fri Oct 17 12:36:13 2014
@@ -1 +1 @@
-mode="master"
+mode=master

Copied: jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg (from r1606708, jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg?p2=jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg&p1=jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg&r1=1606708&r2=1632554&rev=1632554&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg Fri Oct 17 12:36:13 2014
@@ -1,2 +1,2 @@
-mode="slave"
-master.host="127.0.0.1"
\ No newline at end of file
+mode=slave
+master.host=127.0.0.1
\ No newline at end of file

Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/pom.xml?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/pom.xml (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/pom.xml Fri Oct 17 12:36:13 2014
@@ -11,14 +11,13 @@
   OF ANY KIND, either express or implied. See the License for the specific 
   language governing permissions and limitations under the License. -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd ">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd ">
   <modelVersion>4.0.0</modelVersion>
 
   <parent>
     <groupId>org.apache.jackrabbit</groupId>
     <artifactId>oak-parent</artifactId>
-    <version>1.1-SNAPSHOT</version>
+    <version>1.0.8-SNAPSHOT</version>
     <relativePath>../oak-parent/pom.xml</relativePath>
   </parent>
 
@@ -28,6 +27,7 @@
   <description>Oak TarMK failover module</description>
 
   <properties>
+      <netty-version>4.0.23.Final</netty-version>
   </properties>
 
   <build>
@@ -85,9 +85,30 @@
             combine.children="append">
             <failover.server.port>${failover.server.port}</failover.server.port>
           </systemPropertyVariables>
+          <excludes>
+            <!-- excluding long running tests -->
+            <exclude>**/BrokenNetworkTest.java</exclude>
+            <exclude>**/FailoverIPRangeTest.java</exclude>
+            <exclude>**/BulkTest.java</exclude>
+            <exclude>**/MBeanTest.java</exclude>
+          </excludes>
         </configuration>
       </plugin>
     </plugins>
+    <pluginManagement>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg</exclude>
+                        <exclude>osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </pluginManagement>
   </build>
 
   <dependencies>
@@ -123,31 +144,31 @@
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-common</artifactId>
-      <version>4.0.20.Final</version>
+      <version>${netty-version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-buffer</artifactId>
-      <version>4.0.20.Final</version>
+      <version>${netty-version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-transport</artifactId>
-      <version>4.0.20.Final</version>
+      <version>${netty-version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-codec</artifactId>
-      <version>4.0.20.Final</version>
+      <version>${netty-version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-handler</artifactId>
-      <version>4.0.20.Final</version>
+      <version>${netty-version}</version>
       <scope>provided</scope>
     </dependency>
 
@@ -170,6 +191,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>ch.qos.logback</groupId>
       <artifactId>logback-classic</artifactId>
       <scope>test</scope>

Copied: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java (from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java?p2=jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java&p1=jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java&r1=1620634&r2=1632554&rev=1632554&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java Fri Oct 17 12:36:13 2014
@@ -37,6 +37,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class CommunicationObserver {
+    private static final int MAX_CLIENT_STATISTICS = 10;
 
     private class CommunicationPartnerMBean implements ObservablePartnerMBean {
         private final ObjectName mbeanName;
@@ -45,6 +46,8 @@ public class CommunicationObserver {
         public Date lastSeen;
         public String remoteAddress;
         public int remotePort;
+        public long segmentsSent;
+        public long segmentBytesSent;
 
         public CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException {
             this.clientName = clientName;
@@ -79,6 +82,16 @@ public class CommunicationObserver {
         public String getLastSeenTimestamp() {
             return this.lastSeen == null ? null : this.lastSeen.toString();
         }
+
+        @Override
+        public long getTransferredSegments() {
+            return this.segmentsSent;
+        }
+
+        @Override
+        public long getTransferredSegmentBytes() {
+            return this.segmentBytesSent;
+        }
     }
 
     private static final Logger log = LoggerFactory
@@ -92,22 +105,28 @@ public class CommunicationObserver {
         this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>();
     }
 
-    public void unregister() {
+    private void unregister(CommunicationPartnerMBean m) {
         final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        try {
+            jmxServer.unregisterMBean(m.getMBeanName());
+        }
+        catch (Exception e) {
+            log.error("error unregistering mbean for client '" + m.getName() + "'", e);
+        }
+    }
+
+    public void unregister() {
         for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
-            try {
-                jmxServer.unregisterMBean(m.getMBeanName());
-            }
-            catch (Exception e) {
-                log.error("error unregistering mbean for client '" + m.getName() + "'", e);
-            }
+            unregister(m);
         }
     }
 
     public void gotMessageFrom(String client, String request, InetSocketAddress remote) throws MalformedObjectNameException {
+        log.debug("got message '" + request + "' from client " + client);
         CommunicationPartnerMBean m = this.partnerDetails.get(client);
         boolean register = false;
         if (m == null) {
+            cleanUp();
             m = new CommunicationPartnerMBean(client);
             m.remoteAddress = remote.getAddress().getHostAddress();
             m.remotePort = remote.getPort();
@@ -127,7 +146,35 @@ public class CommunicationObserver {
         }
     }
 
+    public void didSendSegmentBytes(String client, int size) {
+        log.debug("did send segment with " + size + " bytes to client " + client);
+        CommunicationPartnerMBean m = this.partnerDetails.get(client);
+        m.segmentsSent++;
+        m.segmentBytesSent += size;
+        this.partnerDetails.put(client, m);
+    }
+
     public String getID() {
         return this.identifier;
     }
+
+    // helper
+
+    private void cleanUp() {
+        while (this.partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
+            CommunicationPartnerMBean oldestEntry = oldestEntry();
+            if (oldestEntry == null) return;
+            log.info("housekeeping: removing statistics for " + oldestEntry.getName());
+            unregister(oldestEntry);
+            this.partnerDetails.remove(oldestEntry.getName());
+        }
+    }
+
+    private CommunicationPartnerMBean oldestEntry() {
+        CommunicationPartnerMBean ret = null;
+        for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
+            if (ret == null || ret.lastSeen.after(m.lastSeen)) ret = m;
+        }
+        return ret;
+    }
 }

Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java Fri Oct 17 12:36:13 2014
@@ -27,22 +27,36 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.compression.SnappyFramedDecoder;
 import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.CharsetUtil;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
 
 import java.io.Closeable;
+import java.lang.management.ManagementFactory;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.CommunicationObserver;
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.ClientFailoverStatusMBean;
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdDecoder;
 import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class FailoverClient implements Runnable, Closeable {
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+import javax.net.ssl.SSLException;
+
+public final class FailoverClient implements ClientFailoverStatusMBean, Runnable, Closeable {
+    public static final String CLIENT_ID_PROPERTY_NAME = "failOverID";
 
     private static final Logger log = LoggerFactory
             .getLogger(FailoverClient.class);
@@ -52,70 +66,174 @@ public final class FailoverClient implem
     private int readTimeoutMs = 10000;
 
     private final FailoverStore store;
+    private final CommunicationObserver observer;
     private FailoverClientHandler handler;
     private EventLoopGroup group;
     private EventExecutorGroup executor;
+    private SslContext sslContext;
+    private boolean active = false;
+    private boolean running;
+    private int failedRequests;
+    private long lastSuccessfulRequest;
+    private volatile String state;
+    private final Object sync = new Object();
+
+    public FailoverClient(String host, int port, SegmentStore store) throws SSLException {
+        this(host, port, store, false);
+    }
 
-    public FailoverClient(String host, int port, SegmentStore store) {
+    public FailoverClient(String host, int port, SegmentStore store, boolean secure) throws SSLException {
+        this.state = STATUS_INITIALIZING;
+        this.lastSuccessfulRequest = -1;
+        this.failedRequests = 0;
         this.host = host;
         this.port = port;
+        if (secure) {
+            this.sslContext = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE);
+        }
         this.store = new FailoverStore(store);
+        String s = System.getProperty(CLIENT_ID_PROPERTY_NAME);
+        this.observer = new CommunicationObserver((s == null || s.length() == 0) ? UUID.randomUUID().toString() : s);
+
+        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        try {
+            jmxServer.registerMBean(new StandardMBean(this, ClientFailoverStatusMBean.class), new ObjectName(this.getMBeanName()));
+        }
+        catch (Exception e) {
+            log.error("can register failover status mbean", e);
+        }
     }
 
-    public void run() {
+    public String getMBeanName() {
+        return FailoverStatusMBean.JMX_NAME + ",id=\"" + this.observer.getID() + "\"";
+    }
+
+    public void close() {
+        stop();
+        state = STATUS_CLOSING;
+        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        try {
+            jmxServer.unregisterMBean(new ObjectName(this.getMBeanName()));
+        }
+        catch (Exception e) {
+            log.error("can unregister failover status mbean", e);
+        }
+        observer.unregister();
+        if (group != null && !group.isShuttingDown()) {
+            group.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+                    .syncUninterruptibly();
+        }
+        if (executor != null && !executor.isShuttingDown()) {
+            executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+                    .syncUninterruptibly();
+        }
+        state = STATUS_CLOSED;
+    }
 
-        this.executor = new DefaultEventExecutorGroup(4);
-        this.handler = new FailoverClientHandler(this.store, executor);
+    public void run() {
 
-        group = new NioEventLoopGroup();
-        Bootstrap b = new Bootstrap();
-        b.group(group);
-        b.channel(NioSocketChannel.class);
-        b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, readTimeoutMs);
-        b.option(ChannelOption.TCP_NODELAY, true);
-        b.option(ChannelOption.SO_REUSEADDR, true);
-        b.option(ChannelOption.SO_KEEPALIVE, true);
-
-        b.handler(new ChannelInitializer<SocketChannel>() {
-            @Override
-            public void initChannel(SocketChannel ch) throws Exception {
-                ChannelPipeline p = ch.pipeline();
-
-                // p.addLast(new LoggingHandler(LogLevel.INFO));
-                // Enable stream compression
-                // p.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
-                // p.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
-
-                // WriteTimeoutHandler & ReadTimeoutHandler
-                p.addLast("readTimeoutHandler", new ReadTimeoutHandler(
-                        readTimeoutMs, TimeUnit.MILLISECONDS));
-
-                p.addLast(new StringEncoder(CharsetUtil.UTF_8));
-                p.addLast(new RecordIdDecoder(store));
-                p.addLast(executor, handler);
+        Bootstrap b;
+        synchronized (this.sync) {
+            if (this.active) {
+                return;
             }
-        });
+            state = STATUS_STARTING;
+            executor = new DefaultEventExecutorGroup(4);
+            handler = new FailoverClientHandler(this.store, executor, this.observer);
+            group = new NioEventLoopGroup();
+
+            b = new Bootstrap();
+            b.group(group);
+            b.channel(NioSocketChannel.class);
+            b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, readTimeoutMs);
+            b.option(ChannelOption.TCP_NODELAY, true);
+            b.option(ChannelOption.SO_REUSEADDR, true);
+            b.option(ChannelOption.SO_KEEPALIVE, true);
+
+            b.handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception {
+                    ChannelPipeline p = ch.pipeline();
+                    if (sslContext != null) {
+                        p.addLast(sslContext.newHandler(ch.alloc()));
+                    }
+                    // WriteTimeoutHandler & ReadTimeoutHandler
+                    p.addLast("readTimeoutHandler", new ReadTimeoutHandler(
+                            readTimeoutMs, TimeUnit.MILLISECONDS));
+                    p.addLast(new StringEncoder(CharsetUtil.UTF_8));
+                    p.addLast(new SnappyFramedDecoder(true));
+                    p.addLast(new RecordIdDecoder(store));
+                    p.addLast(executor, handler);
+                }
+            });
+            state = STATUS_RUNNING;
+            this.running = true;
+            this.active = true;
+        }
+
         try {
             // Start the client.
             ChannelFuture f = b.connect(host, port).sync();
             // Wait until the connection is closed.
             f.channel().closeFuture().sync();
+            this.failedRequests = 0;
+            this.lastSuccessfulRequest = System.currentTimeMillis() / 1000;
         } catch (Exception e) {
+            this.failedRequests++;
             log.error("Failed synchronizing state.", e);
+            stop();
         } finally {
-            close();
+            synchronized (this.sync) {
+                this.active = false;
+            }
         }
     }
 
     @Override
-    public void close() {
-        if (group != null && !group.isShuttingDown()) {
-            group.shutdownGracefully(1, 2, TimeUnit.SECONDS)
-                    .syncUninterruptibly();
-        }
-        if (executor != null && !executor.isShuttingDown()) {
-            executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
-                    .syncUninterruptibly();
+    public String getMode() {
+        return "client: " + this.observer.getID();
+    }
+
+    @Override
+    public boolean isRunning() { return running;}
+
+    @Override
+    public void start() {
+        if (!running) run();
+    }
+
+    @Override
+    public void stop() {
+        //TODO running flag doesn't make sense this way, since run() is usually scheduled to be called repeatedly.
+        if (running) {
+            running = false;
+            state = STATUS_STOPPED;
         }
     }
+
+    @Override
+    public String getStatus() {
+        return this.state;
+    }
+
+    @Override
+    public int getFailedRequests() {
+        return this.failedRequests;
+    }
+
+    @Override
+    public int getSecondsSinceLastSuccess() {
+        if (this.lastSuccessfulRequest < 0) return -1;
+        return (int)(System.currentTimeMillis() / 1000 - this.lastSuccessfulRequest);
+    }
+
+    @Override
+    public int calcFailedRequests() {
+        return this.getFailedRequests();
+    }
+
+    @Override
+    public int calcSecondsSinceLastSuccess() {
+        return this.getSecondsSinceLastSuccess();
+    }
 }

Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java Fri Oct 17 12:36:13 2014
@@ -19,18 +19,17 @@
 package org.apache.jackrabbit.oak.plugins.segment.failover.client;
 
 import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.newGetHeadReq;
+
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
 
 import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
+import org.apache.jackrabbit.oak.plugins.segment.failover.CommunicationObserver;
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdDecoder;
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentDecoder;
 import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
@@ -45,29 +44,31 @@ public class FailoverClientHandler exten
 
     private final FailoverStore store;
     private final EventExecutorGroup executor;
+    private final CommunicationObserver observer;
     private EventExecutorGroup preloaderExecutor;
     private EventExecutorGroup loaderExecutor;
 
     private ChannelHandlerContext ctx;
 
-    private Promise<RecordId> headPromise;
-
     public FailoverClientHandler(final FailoverStore store,
-            EventExecutorGroup executor) {
+            EventExecutorGroup executor, CommunicationObserver observer) {
         this.store = store;
         this.executor = executor;
+        this.observer = observer;
     }
 
     @Override
-    public void channelActive(ChannelHandlerContext ctx) {
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
         this.ctx = ctx;
-        sendHeadRequest();
+        log.debug("sending head request");
+        ctx.writeAndFlush(newGetHeadReq(this.observer.getID()));
+        log.debug("did send head request");
     }
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, RecordId msg)
             throws Exception {
-        headPromise.setSuccess(msg);
+        setHead(msg);
     };
 
     @Override
@@ -75,28 +76,7 @@ public class FailoverClientHandler exten
         ctx.flush();
     }
 
-    private synchronized void sendHeadRequest() {
-        headPromise = ctx.executor().newPromise();
-        headPromise.addListener(new GenericFutureListener<Future<RecordId>>() {
-            @Override
-            public void operationComplete(Future<RecordId> future) {
-                if (future.isSuccess()) {
-                    try {
-                        setHead(future.get());
-                    } catch (Exception e) {
-                        exceptionCaught(ctx, e);
-                    }
-                } else {
-                    exceptionCaught(ctx, future.cause());
-                }
-            }
-        });
-        ctx.writeAndFlush(newGetHeadReq()).addListener(
-                new FailedRequestListener(headPromise));
-    }
-
     synchronized void setHead(RecordId head) {
-        headPromise = null;
 
         if (store.getHead().getRecordId().equals(head)) {
             // all sync'ed up
@@ -104,6 +84,8 @@ public class FailoverClientHandler exten
             ctx.close();
             return;
         }
+
+        log.debug("updating current head to " + head);
         ctx.pipeline().remove(RecordIdDecoder.class);
         ctx.pipeline().remove(this);
         ctx.pipeline().addLast(new SegmentDecoder(store));
@@ -114,17 +96,12 @@ public class FailoverClientHandler exten
 
         loaderExecutor = new DefaultEventExecutorGroup(4);
         SegmentLoaderHandler h2 = new SegmentLoaderHandler(store, head,
-                preloaderExecutor, loaderExecutor);
+                preloaderExecutor, loaderExecutor, this.observer.getID());
         ctx.pipeline().addLast(loaderExecutor, h2);
 
         h1.channelActive(ctx);
         h2.channelActive(ctx);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        log.error("Failed synchronizing state.", cause);
-        close();
+        log.debug("updating current head finished");
     }
 
     @Override

Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java Fri Oct 17 12:36:13 2014
@@ -19,21 +19,21 @@
 package org.apache.jackrabbit.oak.plugins.segment.failover.client;
 
 import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.newGetSegmentReq;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.EventExecutorGroup;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentReply;
 import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
 import org.apache.jackrabbit.oak.plugins.segment.failover.store.RemoteSegmentLoader;
@@ -48,6 +48,7 @@ public class SegmentLoaderHandler extend
             .getLogger(SegmentLoaderHandler.class);
 
     private final FailoverStore store;
+    private final String clientID;
     private final RecordId head;
     private final EventExecutorGroup preloaderExecutor;
     private final EventExecutorGroup loaderExecutor;
@@ -60,11 +61,13 @@ public class SegmentLoaderHandler extend
 
     public SegmentLoaderHandler(final FailoverStore store, RecordId head,
             EventExecutorGroup preloaderExecutor,
-            EventExecutorGroup loaderExecutor) {
+            EventExecutorGroup loaderExecutor,
+            String clientID) {
         this.store = store;
         this.head = head;
         this.preloaderExecutor = preloaderExecutor;
         this.loaderExecutor = loaderExecutor;
+        this.clientID = clientID;
     }
 
     @Override
@@ -77,12 +80,13 @@ public class SegmentLoaderHandler extend
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
             throws Exception {
         if (evt instanceof SegmentReply) {
+            //log.debug("offering segment " + ((SegmentReply) evt).getSegment());
             segment.offer(((SegmentReply) evt).getSegment());
         }
     }
 
     private void initSync() {
-        log.debug("new head id " + head);
+        log.info("new head id " + head);
         long t = System.currentTimeMillis();
 
         try {
@@ -91,38 +95,66 @@ public class SegmentLoaderHandler extend
             SegmentNodeBuilder builder = before.builder();
 
             SegmentNodeState current = new SegmentNodeState(head);
-            current.compareAgainstBaseState(before, new ApplyDiff(builder));
-
+            do {
+                try {
+                    current.compareAgainstBaseState(before, new ApplyDiff(builder));
+                    break;
+                }
+                catch (SegmentNotFoundException e) {
+                    // the segment is locally damaged or not present anymore
+                    // lets try to read this from the master again
+                    String id = e.getSegmentId();
+                    Segment s = readSegment(e.getSegmentId());
+                    if (s == null) {
+                        log.warn("can't read locally corrupt segment " + id + " from master");
+                        throw e;
+                    }
+
+                    log.info("did reread locally corrupt segment " + id + " with size " + s.size());
+                    ByteArrayOutputStream bout = new ByteArrayOutputStream(s.size());
+                    try {
+                        s.writeTo(bout);
+                    }
+                    catch (IOException f) {
+                        log.error("can't wrap segment to output stream", f);
+                        throw e;
+                    }
+                    store.writeSegment(s.getSegmentId(), bout.toByteArray(), 0, s.size());
+                }
+            } while(true);
             boolean ok = store.setHead(before, builder.getNodeState());
             log.info("#updated state (set head {}) in {}ms.", ok,
                     System.currentTimeMillis() - t);
         } finally {
             close();
         }
+        log.debug("returning initSync");
     }
 
     @Override
-    public Segment readSegment(final SegmentId id) {
-        ctx.writeAndFlush(newGetSegmentReq(id)).addListener(reqListener);
+    public Segment readSegment(final String id) {
+        ctx.writeAndFlush(newGetSegmentReq(this.clientID, id));
         return getSegment();
     }
 
-    private final ChannelFutureListener reqListener = new ChannelFutureListener() {
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+            throws Exception {
+        log.warn("Closing channel. Got exception: " + cause);
+        ctx.close();
+    }
 
-        @Override
-        public void operationComplete(ChannelFuture future) {
-            if (!future.isSuccess()) {
-                exceptionCaught(ctx, future.cause());
-            }
-        }
-    };
+    // implementation of RemoteSegmentLoader
 
     public Segment getSegment() {
         boolean interrupted = false;
         try {
             for (;;) {
                 try {
-                    return segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
+                    log.debug("polling segment");
+                    Segment s = segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
+                    log.debug("returning segment " + s);
+                    return s;
                 } catch (InterruptedException ignore) {
                     interrupted = true;
                 }
@@ -132,15 +164,9 @@ public class SegmentLoaderHandler extend
                 Thread.currentThread().interrupt();
             }
         }
-    }
 
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        log.error("Failed synchronizing state.", cause);
-        close();
     }
 
-    @Override
     public void close() {
         ctx.close();
         if (preloaderExecutor != null && !preloaderExecutor.isShuttingDown()) {
@@ -153,7 +179,6 @@ public class SegmentLoaderHandler extend
         }
     }
 
-    @Override
     public boolean isClosed() {
         return (loaderExecutor != null && (loaderExecutor.isShuttingDown() || loaderExecutor
                 .isShutdown()));

Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java Fri Oct 17 12:36:13 2014
@@ -23,13 +23,18 @@ import io.netty.channel.SimpleChannelInb
 
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SegmentPreLoaderHandler extends
         SimpleChannelInboundHandler<Segment> {
+    private static final Logger log = LoggerFactory
+            .getLogger(SegmentPreLoaderHandler.class);
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, Segment msg)
             throws Exception {
+        log.info("fire new segment reply for " + msg.getSegmentId());
         ctx.fireUserEventTriggered(new SegmentReply(msg));
     }
 

Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java Fri Oct 17 12:36:13 2014
@@ -18,22 +18,43 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
 
-import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
-
 public class Messages {
 
     public static final byte HEADER_RECORD = 0x00;
     public static final byte HEADER_SEGMENT = 0x01;
 
     public static final String GET_HEAD = "h";
-
     public static final String GET_SEGMENT = "s.";
 
-    public static String newGetHeadReq() {
-        return GET_HEAD + "\r\n";
+    private static final String MAGIC = "FailOver-CMD@";
+    private static final String SEPARATOR = ":";
+
+    private static String newRequest(String clientID, String body) {
+        return MAGIC + (clientID == null ? "" : clientID.replace(SEPARATOR, "#")) + SEPARATOR + body + "\r\n";
+    }
+
+    public static String newGetHeadReq(String clientID) {
+        return newRequest(clientID, GET_HEAD);
+    }
+
+    public static String newGetSegmentReq(String clientID, String sid) {
+        return newRequest(clientID, GET_SEGMENT + sid);
+    }
+
+    public static String extractMessageFrom(String payload) {
+        if (payload.startsWith(MAGIC) && payload.length() > MAGIC.length()) {
+            int i = payload.indexOf(SEPARATOR);
+            return payload.substring(i + 1);
+        }
+        return null;
     }
 
-    public static String newGetSegmentReq(SegmentId sid) {
-        return GET_SEGMENT + sid.toString() + "\r\n";
+    public static String extractClientFrom(String payload) {
+        if (payload.startsWith(MAGIC) && payload.length() > MAGIC.length()) {
+            payload = payload.substring(MAGIC.length());
+            int i = payload.indexOf(SEPARATOR);
+            return payload.substring(0, i);
+        }
+        return null;
     }
 }

Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java Fri Oct 17 12:36:13 2014
@@ -19,6 +19,8 @@
 
 package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
 
+import java.io.IOException;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -46,7 +48,7 @@ public class RecordIdDecoder extends Len
             throws Exception {
         ByteBuf frame = (ByteBuf) super.decode(ctx, in);
         if (frame == null) {
-            return null;
+            throw new IOException("Received unexpected empty frame. Maybe you have enabled secure transmission on only one endpoint of the connection.");
         }
         byte type = frame.readByte();
         frame.discardReadBytes();

Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java Fri Oct 17 12:36:13 2014
@@ -19,25 +19,38 @@
 
 package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
 
+import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentEncoder.EXTRA_HEADERS_LEN;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
 public class SegmentDecoder extends LengthFieldBasedFrameDecoder {
 
     private static final Logger log = LoggerFactory
             .getLogger(SegmentDecoder.class);
 
+    /**
+     * the maximum possible size a header message might have
+     */
+    private static final int MAX_LENGHT = Segment.MAX_SEGMENT_SIZE
+            + EXTRA_HEADERS_LEN;
+
     private final SegmentStore store;
 
     public SegmentDecoder(SegmentStore store) {
-        super(Segment.MAX_SEGMENT_SIZE + 21, 0, 4, 0, 4);
+        super(MAX_LENGHT, 0, 4, 0, 0);
         this.store = store;
     }
 
@@ -48,14 +61,26 @@ public class SegmentDecoder extends Leng
         if (frame == null) {
             return null;
         }
+        int len = frame.readInt();
         byte type = frame.readByte();
         long msb = frame.readLong();
         long lsb = frame.readLong();
-        frame.discardReadBytes();
-        SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
-        Segment s = new Segment(store.getTracker(), id, frame.nioBuffer());
-        log.debug("received type {} with id {} and size {}", type, id, s.size());
-        return s;
+        long hash = frame.readLong();
+        byte[] segment = new byte[len - 25];
+        frame.getBytes(29, segment);
+        Hasher hasher = Hashing.murmur3_32().newHasher();
+        long check = hasher.putBytes(segment).hash().padToLong();
+        if (hash == check) {
+            SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
+            Segment s = new Segment(store.getTracker(), id,
+                    ByteBuffer.wrap(segment));
+            log.debug("received type {} with id {} and size {}", type, id,
+                    s.size());
+            return s;
+        }
+        log.debug("received corrupted segment {}, ignoring", new UUID(msb, lsb));
+        return null;
+
     }
 
     @Override

Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java Fri Oct 17 12:36:13 2014
@@ -20,27 +20,54 @@
 package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
 
+import java.io.ByteArrayOutputStream;
+
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
 
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
 public class SegmentEncoder extends MessageToByteEncoder<Segment> {
 
+    /**
+     * A segment message is composed of:
+     * 
+     * <pre>
+     *  - (4 bytes) the message length
+     *  - (1 byte ) a message type (not currently used)
+     *  - (8 bytes) segment id most significant bits
+     *  - (8 bytes) segment id least significant bits
+     *  - (8 bytes) checksum hash
+     * </pre>
+     */
+    static int EXTRA_HEADERS_LEN = 29;
+
+    /**
+     * the header size not including the length
+     */
+    private int EXTRA_HEADERS_WO_SIZE = EXTRA_HEADERS_LEN - 4;
+
     @Override
     protected void encode(ChannelHandlerContext ctx, Segment s, ByteBuf out)
             throws Exception {
         SegmentId id = s.getSegmentId();
-        int len = s.size() + 17;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(s.size());
+        s.writeTo(baos);
+        byte[] segment = baos.toByteArray();
+
+        Hasher hasher = Hashing.murmur3_32().newHasher();
+        long hash = hasher.putBytes(segment).hash().padToLong();
+
+        int len = segment.length + EXTRA_HEADERS_WO_SIZE;
         out.writeInt(len);
         out.writeByte(Messages.HEADER_SEGMENT);
         out.writeLong(id.getMostSignificantBits());
         out.writeLong(id.getLeastSignificantBits());
-        ByteBufOutputStream bout = new ByteBufOutputStream(out);
-        s.writeTo(bout);
-        bout.flush();
-        bout.close();
+        out.writeLong(hash);
+        out.writeBytes(segment);
     }
 }

Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java?rev=1632554&r1=1620634&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java Fri Oct 17 12:36:13 2014
@@ -39,4 +39,10 @@ public interface ObservablePartnerMBean 
     @CheckForNull
     @Description("Time the remote instance was last contacted")
     String getLastSeenTimestamp();
+
+    @Description("Number of transferred segments")
+    long getTransferredSegments();
+
+    @Description("Number of bytes stored in transferred segments")
+    long getTransferredSegmentBytes();
 }