You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2014/10/17 14:36:14 UTC
svn commit: r1632554 [1/2] - in /jackrabbit/oak/branches/1.0: ./ oak-core/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ oak-doc/
oak-doc/src/site/ oak-doc/src/site/markdown/coldstandby/ oak-run/
oak-run/src/main/java/org/apache/jac...
Author: alexparvulescu
Date: Fri Oct 17 12:36:13 2014
New Revision: 1632554
URL: http://svn.apache.org/r1632554
Log:
OAK-1915 TarMK Cold Standby
- merged to 1.0 branch
Added:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java
- copied unchanged from r1622479, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java
jackrabbit/oak/branches/1.0/oak-doc/src/site/markdown/coldstandby/
- copied from r1626168, jackrabbit/oak/trunk/oak-doc/src/site/markdown/coldstandby/
jackrabbit/oak/branches/1.0/oak-tarmk-failover/ (props changed)
- copied from r1606087, jackrabbit/oak/trunk/oak-tarmk-failover/
jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
- copied, changed from r1606708, jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
- copied, changed from r1606708, jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java
- copied, changed from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/
- copied from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ClientFailoverStatusMBean.java
- copied unchanged from r1626168, jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ClientFailoverStatusMBean.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/resources/
- copied from r1625963, jackrabbit/oak/trunk/oak-tarmk-failover/src/main/resources/
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java
- copied unchanged from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/DebugSegmentStore.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java
- copied, changed from r1622479, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java
- copied, changed from r1622479, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
- copied, changed from r1622479, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java
- copied, changed from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java
- copied, changed from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java
- copied unchanged from r1622479, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
- copied, changed from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java
- copied, changed from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java
- copied, changed from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java
Removed:
jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.config
jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.config
Modified:
jackrabbit/oak/branches/1.0/ (props changed)
jackrabbit/oak/branches/1.0/oak-core/pom.xml
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
jackrabbit/oak/branches/1.0/oak-doc/ (props changed)
jackrabbit/oak/branches/1.0/oak-doc/src/site/site.xml
jackrabbit/oak/branches/1.0/oak-run/README.md
jackrabbit/oak/branches/1.0/oak-run/pom.xml
jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/README.md
jackrabbit/oak/branches/1.0/oak-tarmk-failover/pom.xml
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/RemoteSegmentLoader.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java
jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java
jackrabbit/oak/branches/1.0/pom.xml
Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
Merged /jackrabbit/oak/trunk:r1601878,1606087,1606708,1606711,1607031-1607032,1607331,1607366,1607392,1609165,1614593,1620634,1622479,1623364,1623969,1624551,1624973,1624994,1625025,1625036,1625916,1625963,1626021,1626053,1626163,1626168,1626175,1626265
Modified: jackrabbit/oak/branches/1.0/oak-core/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/pom.xml?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/pom.xml (original)
+++ jackrabbit/oak/branches/1.0/oak-core/pom.xml Fri Oct 17 12:36:13 2014
@@ -75,6 +75,7 @@
org.apache.jackrabbit.oak.plugins.observation.filter,
org.apache.jackrabbit.oak.plugins.segment,
org.apache.jackrabbit.oak.plugins.segment.http,
+ org.apache.jackrabbit.oak.plugins.segment.file,
org.apache.jackrabbit.oak.plugins.value,
org.apache.jackrabbit.oak.plugins.version,
org.apache.jackrabbit.oak.spi.commit,
Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Fri Oct 17 12:36:13 2014
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
@Component(policy = ConfigurationPolicy.REQUIRE)
public class SegmentNodeStoreService extends ProxyNodeStore
- implements Observable {
+ implements Observable, SegmentStoreProvider {
@Property(description="The unique name of this instance")
public static final String NAME = "name";
@@ -80,6 +80,8 @@ public class SegmentNodeStoreService ext
@Property(description = "TarMK compaction paused flag", boolValue = true)
public static final String PAUSE_COMPACTION = "pauseCompaction";
+ @Property(description = "Flag indicating that this component will not register as a NodeStore but just as a NodeStoreProvider", boolValue = false)
+ public static final String STANDBY = "standby";
/**
* Boolean value indicating a blobStore is to be used
*/
@@ -101,7 +103,8 @@ public class SegmentNodeStoreService ext
policy = ReferencePolicy.DYNAMIC)
private volatile BlobStore blobStore;
- private ServiceRegistration registration;
+ private ServiceRegistration storeRegistration;
+ private ServiceRegistration providerRegistration;
private Registration revisionGCRegistration;
private Registration blobGCRegistration;
private WhiteboardExecutor executor;
@@ -165,7 +168,12 @@ public class SegmentNodeStoreService ext
Dictionary<String, String> props = new Hashtable<String, String>();
props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
- registration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props);
+
+ boolean standby = toBoolean(lookup(context, STANDBY), false);
+ providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, props);
+ if (!standby) {
+ storeRegistration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props);
+ }
OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
executor = new WhiteboardExecutor();
@@ -232,9 +240,13 @@ public class SegmentNodeStoreService ext
}
private void unregisterNodeStore() {
- if(registration != null){
- registration.unregister();
- registration = null;
+ if(providerRegistration != null){
+ providerRegistration.unregister();
+ providerRegistration = null;
+ }
+ if(storeRegistration != null){
+ storeRegistration.unregister();
+ storeRegistration = null;
}
if (revisionGCRegistration != null) {
revisionGCRegistration.unregister();
Propchange: jackrabbit/oak/branches/1.0/oak-doc/
------------------------------------------------------------------------------
Merged /jackrabbit/oak/trunk/oak-doc:r1626168,1626265
Modified: jackrabbit/oak/branches/1.0/oak-doc/src/site/site.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-doc/src/site/site.xml?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-doc/src/site/site.xml (original)
+++ jackrabbit/oak/branches/1.0/oak-doc/src/site/site.xml Fri Oct 17 12:36:13 2014
@@ -46,6 +46,8 @@ under the License.
<item href="known_issues.html" name="Known Issues" />
<item href="dos_and_donts.html" name="Dos and don'ts" />
<item href="when_things_go_wrong.html" name="When things go wrong" />
+ <item href="coldstandby/coldstandby.html" name="Cold Standby" />
+ <item href="FAQ.html" name="FAQ" />
</menu>
<menu name="Developing Oak">
<item href="dev_getting_started.html" name="Getting Started" />
Modified: jackrabbit/oak/branches/1.0/oak-run/README.md
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/README.md?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/README.md (original)
+++ jackrabbit/oak/branches/1.0/oak-run/README.md Fri Oct 17 12:36:13 2014
@@ -9,17 +9,24 @@ The following runmodes are currently ava
* debug : Print status information about an Oak repository.
* upgrade : Upgrade from Jackrabbit 2.x repository to Oak.
* server : Run the Oak Server
+ * syncmaster : Run a TarMK Cold Standby master
+ * syncslave : Run a TarMK Cold Standby slave
See the subsections below for more details on how to use these modes.
Backup
------
-The 'backup' mode creates a backup from an existing oak repository. To start this
-mode, use:
+The 'backup' mode creates a backup from an existing oak repository. To start this mode, use:
$ java -jar oak-run-*.jar backup /path/to/repository /path/to/backup
+Restore
+-------
+
+The 'restore' mode imports a backup of an existing oak repository. To start this mode, use:
+
+ $ java -jar oak-run-*.jar restore /path/to/repository /path/to/backup
Debug
-----
@@ -30,13 +37,40 @@ store. Currently this is only supported
$ java -jar oak-run-*.jar debug /path/to/oak/repository [id...]
+Syncmaster
+-------
+
+The 'syncmaster' mode starts a TarMK Cold Standby master listening on a TCP/IP port for connecting slaves.
+
+ $ java -jar oak-run-*.jar syncmaster [options] /path/to/TarMK
+
+The following options are available:
+
+ --port 8023 - port to listen at
+ --admissible 127.0.0.1 - admissible client IP range or host name
+ --secure - use secure connections
+
+Syncslave
+-------
+
+The 'syncslave' mode starts a TarMK Cold Standby slave to create or update a continous backup from a Cold Standby master.
+
+ $ java -jar oak-run-*.jar syncslave [options] /path/to/TarMK
+
+The following options are available:
+
+ --port 8023 - port to connect to
+ --host 127.0.0.1 - host to connect to
+ --secure - use secure connections
+ --interval 5 - schedule the slave to run continously, connecting every n seconds
+
Compact
-------
The 'compact' mode runs the segment compaction operation on the provided TarMK
repository. To start this mode, use:
- $ java -jar oak-run-*.jar compact /path/to/oak/repository
+ $ java -jar oak-run-*.jar compact /path/to/TarMK
Checkpoints
-----------
Modified: jackrabbit/oak/branches/1.0/oak-run/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/pom.xml?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/pom.xml (original)
+++ jackrabbit/oak/branches/1.0/oak-run/pom.xml Fri Oct 17 12:36:13 2014
@@ -140,6 +140,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-tarmk-failover</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.3.175</version>
@@ -176,7 +181,7 @@
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
- <version>4.4</version>
+ <version>4.6</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
Modified: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java (original)
+++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java Fri Oct 17 12:36:13 2014
@@ -23,6 +23,7 @@ import java.util.Set;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
@@ -76,7 +77,7 @@ public class BenchmarkRunner {
.withOptionalArg().ofType(Boolean.class).defaultsTo(Boolean.FALSE);
OptionSpec<Integer> numberOfUsers = parser.accepts("numberOfUsers")
.withOptionalArg().ofType(Integer.class).defaultsTo(10000);
-
+ OptionSpec<String> nonOption = parser.nonOptions();
OptionSet options = parser.parse(args);
int cacheSize = cache.value(options);
RepositoryFixture[] allFixtures = new RepositoryFixture[] {
@@ -204,7 +205,7 @@ public class BenchmarkRunner {
flatStructure.value(options))
};
- Set<String> argset = Sets.newHashSet(options.nonOptionArguments());
+ Set<String> argset = Sets.newHashSet(nonOption.values(options));
List<RepositoryFixture> fixtures = Lists.newArrayList();
for (RepositoryFixture fixture : allFixtures) {
if (argset.remove(fixture.toString())) {
Modified: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java?rev=1632554&r1=1632553&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java (original)
+++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java Fri Oct 17 12:36:13 2014
@@ -16,6 +16,9 @@
*/
package org.apache.jackrabbit.oak.run;
+import static com.google.common.collect.Sets.newHashSet;
+import static java.util.Arrays.asList;
+
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -29,16 +32,12 @@ import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jcr.Repository;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
@@ -46,6 +45,7 @@ import joptsimple.OptionSpec;
import org.apache.jackrabbit.core.RepositoryContext;
import org.apache.jackrabbit.core.config.RepositoryConfig;
import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.ContentRepository;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
@@ -55,6 +55,8 @@ import org.apache.jackrabbit.oak.fixture
import org.apache.jackrabbit.oak.http.OakServlet;
import org.apache.jackrabbit.oak.jcr.Jcr;
import org.apache.jackrabbit.oak.plugins.backup.FileStoreBackup;
+import org.apache.jackrabbit.oak.plugins.backup.FileStoreRestore;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
@@ -62,6 +64,8 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -76,7 +80,11 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
-import static com.google.common.collect.Sets.newHashSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
public class Main {
@@ -104,6 +112,9 @@ public class Main {
case BACKUP:
backup(args);
break;
+ case RESTORE:
+ restore(args);
+ break;
case BENCHMARK:
BenchmarkRunner.main(args);
break;
@@ -119,6 +130,12 @@ public class Main {
case UPGRADE:
upgrade(args);
break;
+ case SYNCSLAVE:
+ syncSlave(args);
+ break;
+ case SYNCMASTER:
+ syncMaster(args);
+ break;
case CHECKPOINTS:
checkpoints(args);
break;
@@ -169,6 +186,222 @@ public class Main {
}
}
+ private static void restore(String[] args) throws IOException {
+ if (args.length == 2) {
+ // TODO: enable restore for other node store implementations
+ FileStore store = new FileStore(new File(args[0]), 256, false);
+ File target = new File(args[1]);
+ try {
+ FileStoreRestore.restore(target, new SegmentNodeStore(store));
+ } catch (CommitFailedException e) {
+ throw new IOException(e);
+ }
+ store.close();
+ } else {
+ System.err.println("usage: restore <repository> <backup>");
+ System.exit(1);
+ }
+ }
+
+ //TODO react to state changes of FailoverClient (triggered via JMX), once the state model of FailoverClient is complete.
+ private static class ScheduledSyncService extends AbstractScheduledService {
+
+ private final FailoverClient failoverClient;
+ private final int interval;
+
+ public ScheduledSyncService(FailoverClient failoverClient, int interval) {
+ this.failoverClient = failoverClient;
+ this.interval = interval;
+ }
+
+ @Override
+ public void runOneIteration() throws Exception {
+ failoverClient.run();
+ }
+
+ @Override
+ protected Scheduler scheduler() {
+ return Scheduler.newFixedDelaySchedule(0, interval, TimeUnit.SECONDS);
+ }
+ }
+
+
+ private static void syncSlave(String[] args) throws Exception {
+
+ final String defaultHost = "127.0.0.1";
+ final int defaultPort = 8023;
+
+ final OptionParser parser = new OptionParser();
+ final OptionSpec<String> host = parser.accepts("host", "master host").withRequiredArg().ofType(String.class).defaultsTo(defaultHost);
+ final OptionSpec<Integer> port = parser.accepts("port", "master port").withRequiredArg().ofType(Integer.class).defaultsTo(defaultPort);
+ final OptionSpec<Integer> interval = parser.accepts("interval", "interval between successive executions").withRequiredArg().ofType(Integer.class);
+ final OptionSpec<Boolean> secure = parser.accepts("secure", "use secure connections").withRequiredArg().ofType(Boolean.class);
+ final OptionSpec<?> help = parser.acceptsAll(asList("h", "?", "help"), "show help").forHelp();
+ final OptionSpec<String> nonOption = parser.nonOptions(Mode.SYNCSLAVE + " <path to repository>");
+
+ final OptionSet options = parser.parse(args);
+ final List<String> nonOptions = nonOption.values(options);
+
+ if (options.has(help)) {
+ parser.printHelpOn(System.out);
+ System.exit(0);
+ }
+
+ if (nonOptions.isEmpty()) {
+ parser.printHelpOn(System.err);
+ System.exit(1);
+ }
+
+ FileStore store = null;
+ FailoverClient failoverClient = null;
+ try {
+ store = new FileStore(new File(nonOptions.get(0)), 256);
+ failoverClient = new FailoverClient(
+ options.has(host)? options.valueOf(host) : defaultHost,
+ options.has(port)? options.valueOf(port) : defaultPort,
+ store,
+ options.has(secure) && options.valueOf(secure));
+ if (!options.has(interval)) {
+ failoverClient.run();
+ } else {
+ ScheduledSyncService syncService = new ScheduledSyncService(failoverClient, options.valueOf(interval));
+ syncService.startAsync();
+ syncService.awaitTerminated();
+ }
+ } finally {
+ if (store != null) {
+ store.close();
+ }
+ if (failoverClient != null) {
+ failoverClient.close();
+ }
+ }
+ }
+
+ private static void syncMaster(String[] args) throws Exception {
+
+ final int defaultPort = 8023;
+
+ final OptionParser parser = new OptionParser();
+ final OptionSpec<Integer> port = parser.accepts("port", "port to listen").withRequiredArg().ofType(Integer.class).defaultsTo(defaultPort);
+ final OptionSpec<Boolean> secure = parser.accepts("secure", "use secure connections").withRequiredArg().ofType(Boolean.class);
+ final OptionSpec<String> admissible = parser.accepts("admissible", "list of admissible slave host names or ip ranges").withRequiredArg().ofType(String.class);
+ final OptionSpec<?> help = parser.acceptsAll(asList("h", "?", "help"), "show help").forHelp();
+ final OptionSpec<String> nonOption = parser.nonOptions(Mode.SYNCMASTER + " <path to repository>");
+
+ final OptionSet options = parser.parse(args);
+ final List<String> nonOptions = nonOption.values(options);
+
+ if (options.has(help)) {
+ parser.printHelpOn(System.out);
+ System.exit(0);
+ }
+
+ if (nonOptions.isEmpty()) {
+ parser.printHelpOn(System.err);
+ System.exit(1);
+ }
+
+
+ List<String> admissibleSlaves = options.has(admissible) ? options.valuesOf(admissible) : Collections.EMPTY_LIST;
+
+ FileStore store = null;
+ FailoverServer failoverServer = null;
+ try {
+ store = new FileStore(new File(nonOptions.get(0)), 256);
+ failoverServer = new FailoverServer(
+ options.has(port)? options.valueOf(port) : defaultPort,
+ store,
+ admissibleSlaves.toArray(new String[0]),
+ options.has(secure) && options.valueOf(secure));
+ failoverServer.startAndWait();
+ } finally {
+ if (store != null) {
+ store.close();
+ }
+ if (failoverServer != null) {
+ failoverServer.close();
+ }
+ }
+ }
+
+ public static NodeStore bootstrapNodeStore(String[] args, Closer closer,
+ String h) throws IOException {
+ //TODO add support for other NodeStore flags
+ OptionParser parser = new OptionParser();
+ OptionSpec<Integer> clusterId = parser
+ .accepts("clusterId", "MongoMK clusterId").withRequiredArg()
+ .ofType(Integer.class).defaultsTo(0);
+ OptionSpec<?> help = parser.acceptsAll(asList("h", "?", "help"),
+ "show help").forHelp();
+ OptionSpec<String> nonOption = parser
+ .nonOptions(h);
+
+ OptionSet options = parser.parse(args);
+ List<String> nonOptions = nonOption.values(options);
+
+ if (options.has(help)) {
+ parser.printHelpOn(System.out);
+ System.exit(0);
+ }
+
+ if (nonOptions.isEmpty()) {
+ parser.printHelpOn(System.err);
+ System.exit(1);
+ }
+
+ String src = nonOptions.get(0);
+ if (src.startsWith(MongoURI.MONGODB_PREFIX)) {
+ MongoClientURI uri = new MongoClientURI(src);
+ if (uri.getDatabase() == null) {
+ System.err.println("Database missing in MongoDB URI: "
+ + uri.getURI());
+ System.exit(1);
+ }
+ MongoConnection mongo = new MongoConnection(uri.getURI());
+ closer.register(asCloseable(mongo));
+ DocumentNodeStore store = new DocumentMK.Builder()
+ .setMongoDB(mongo.getDB())
+ .setClusterId(clusterId.value(options)).getNodeStore();
+ closer.register(asCloseable(store));
+ return store;
+ } else {
+ FileStore fs = new FileStore(new File(src), 256, false);
+ closer.register(asCloseable(fs));
+ return new SegmentNodeStore(fs);
+ }
+ }
+
+ private static Closeable asCloseable(final FileStore fs) {
+ return new Closeable() {
+
+ @Override
+ public void close() throws IOException {
+ fs.close();
+ }
+ };
+ }
+
+ private static Closeable asCloseable(final DocumentNodeStore dns) {
+ return new Closeable() {
+
+ @Override
+ public void close() throws IOException {
+ dns.dispose();
+ }
+ };
+ }
+
+ private static Closeable asCloseable(final MongoConnection con) {
+ return new Closeable() {
+
+ @Override
+ public void close() throws IOException {
+ con.close();
+ }
+ };
+ }
+
private static void compact(String[] args) throws IOException {
if (args.length != 1) {
System.err.println("usage: compact <path>");
@@ -456,10 +689,10 @@ public class Main {
private static void upgrade(String[] args) throws Exception {
OptionParser parser = new OptionParser();
parser.accepts("datastore", "keep data store");
-
+ OptionSpec<String> nonOption = parser.nonOptions();
OptionSet options = parser.parse(args);
- List<String> argList = options.nonOptionArguments();
+ List<String> argList = nonOption.values(options);
if (argList.size() == 2 || argList.size() == 3) {
File dir = new File(argList.get(0));
File xml = new File(dir, "repository.xml");
@@ -527,12 +760,12 @@ public class Main {
OptionSpec<Integer> port = parser.accepts("port", "MongoDB port").withRequiredArg().ofType(Integer.class).defaultsTo(27017);
OptionSpec<String> dbName = parser.accepts("db", "MongoDB database").withRequiredArg();
OptionSpec<Integer> clusterIds = parser.accepts("clusterIds", "Cluster Ids").withOptionalArg().ofType(Integer.class).withValuesSeparatedBy(',');
-
+ OptionSpec<String> nonOption = parser.nonOptions();
OptionSet options = parser.parse(args);
OakFixture oakFixture;
- List<String> arglist = options.nonOptionArguments();
+ List<String> arglist = nonOption.values(options);
String uri = (arglist.isEmpty()) ? defaultUri : arglist.get(0);
String fix = (arglist.size() <= 1) ? OakFixture.OAK_MEMORY : arglist.get(1);
@@ -672,11 +905,14 @@ public class Main {
public enum Mode {
BACKUP("backup"),
+ RESTORE("restore"),
BENCHMARK("benchmark"),
DEBUG("debug"),
COMPACT("compact"),
SERVER("server"),
UPGRADE("upgrade"),
+ SYNCSLAVE("syncSlave"),
+ SYNCMASTER("syncmaster"),
CHECKPOINTS("checkpoints");
private final String name;
Propchange: jackrabbit/oak/branches/1.0/oak-tarmk-failover/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Fri Oct 17 12:36:13 2014
@@ -0,0 +1,5 @@
+target
+.*
+*.iml
+*.ipr
+*.iws
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/README.md
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/README.md?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/README.md (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/README.md Fri Oct 17 12:36:13 2014
@@ -7,8 +7,8 @@ Failover
The component should be installed when failover support is needed.
The setup is expected to be: one master to one/many slaves nodes.
-The slave will periodically poll the master for the head state over http
-on a custom port, if it changed, it should pull in all the new segments.
+The slave will periodically poll the master for the head state, if this
+changed, it will pull in all the new segments since the last sync.
Setup in OSGi
-------------
@@ -24,7 +24,7 @@ Master host represents the master host i
Interval represents how often the sync thread should run, in seconds.
See examples in the osgi-conf folder for each run mode. To install a new OSGI config in the sling launcher,
-you only need to create a new folder called 'install' in the sling.home folder and copy the configs there.
+you only need to create a new folder called 'install' in the sling.home folder and copy the specific config there.
TODO
----
@@ -32,7 +32,6 @@ TODO
- timeout handling doesn't cover everything on both server and slave
- error handling on the slave still has some issues (the slave hangs)
- maybe enable compression of the segments over the wire
- - maybe add a checksum to the segment encoder/decoder to verify the integrity of the transfer
- slave runmode could possibly be a read-only mode (no writes permitted)
License
Copied: jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg (from r1606708, jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg?p2=jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg&p1=jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg&r1=1606708&r2=1632554&rev=1632554&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg Fri Oct 17 12:36:13 2014
@@ -1 +1 @@
-mode="master"
+mode=master
Copied: jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg (from r1606708, jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg?p2=jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg&p1=jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg&r1=1606708&r2=1632554&rev=1632554&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg Fri Oct 17 12:36:13 2014
@@ -1,2 +1,2 @@
-mode="slave"
-master.host="127.0.0.1"
\ No newline at end of file
+mode=slave
+master.host=127.0.0.1
\ No newline at end of file
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/pom.xml?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/pom.xml (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/pom.xml Fri Oct 17 12:36:13 2014
@@ -11,14 +11,13 @@
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd ">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd ">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-parent</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.0.8-SNAPSHOT</version>
<relativePath>../oak-parent/pom.xml</relativePath>
</parent>
@@ -28,6 +27,7 @@
<description>Oak TarMK failover module</description>
<properties>
+ <netty-version>4.0.23.Final</netty-version>
</properties>
<build>
@@ -85,9 +85,30 @@
combine.children="append">
<failover.server.port>${failover.server.port}</failover.server.port>
</systemPropertyVariables>
+ <excludes>
+ <!-- excluding long running tests -->
+ <exclude>**/BrokenNetworkTest.java</exclude>
+ <exclude>**/FailoverIPRangeTest.java</exclude>
+ <exclude>**/BulkTest.java</exclude>
+ <exclude>**/MBeanTest.java</exclude>
+ </excludes>
</configuration>
</plugin>
</plugins>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg</exclude>
+ <exclude>osgi-conf/slave/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
</build>
<dependencies>
@@ -123,31 +144,31 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
- <version>4.0.20.Final</version>
+ <version>${netty-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
- <version>4.0.20.Final</version>
+ <version>${netty-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
- <version>4.0.20.Final</version>
+ <version>${netty-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
- <version>4.0.20.Final</version>
+ <version>${netty-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
- <version>4.0.20.Final</version>
+ <version>${netty-version}</version>
<scope>provided</scope>
</dependency>
@@ -170,6 +191,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.3.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
Copied: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java (from r1620634, jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java?p2=jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java&p1=jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java&r1=1620634&r2=1632554&rev=1632554&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java Fri Oct 17 12:36:13 2014
@@ -37,6 +37,7 @@ import java.util.HashMap;
import java.util.Map;
public class CommunicationObserver {
+ private static final int MAX_CLIENT_STATISTICS = 10;
private class CommunicationPartnerMBean implements ObservablePartnerMBean {
private final ObjectName mbeanName;
@@ -45,6 +46,8 @@ public class CommunicationObserver {
public Date lastSeen;
public String remoteAddress;
public int remotePort;
+ public long segmentsSent;
+ public long segmentBytesSent;
public CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException {
this.clientName = clientName;
@@ -79,6 +82,16 @@ public class CommunicationObserver {
public String getLastSeenTimestamp() {
return this.lastSeen == null ? null : this.lastSeen.toString();
}
+
+ @Override
+ public long getTransferredSegments() {
+ return this.segmentsSent;
+ }
+
+ @Override
+ public long getTransferredSegmentBytes() {
+ return this.segmentBytesSent;
+ }
}
private static final Logger log = LoggerFactory
@@ -92,22 +105,28 @@ public class CommunicationObserver {
this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>();
}
- public void unregister() {
+ private void unregister(CommunicationPartnerMBean m) {
final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ jmxServer.unregisterMBean(m.getMBeanName());
+ }
+ catch (Exception e) {
+ log.error("error unregistering mbean for client '" + m.getName() + "'", e);
+ }
+ }
+
+ public void unregister() {
for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
- try {
- jmxServer.unregisterMBean(m.getMBeanName());
- }
- catch (Exception e) {
- log.error("error unregistering mbean for client '" + m.getName() + "'", e);
- }
+ unregister(m);
}
}
public void gotMessageFrom(String client, String request, InetSocketAddress remote) throws MalformedObjectNameException {
+ log.debug("got message '" + request + "' from client " + client);
CommunicationPartnerMBean m = this.partnerDetails.get(client);
boolean register = false;
if (m == null) {
+ cleanUp();
m = new CommunicationPartnerMBean(client);
m.remoteAddress = remote.getAddress().getHostAddress();
m.remotePort = remote.getPort();
@@ -127,7 +146,35 @@ public class CommunicationObserver {
}
}
+ public void didSendSegmentBytes(String client, int size) {
+ log.debug("did send segment with " + size + " bytes to client " + client);
+ CommunicationPartnerMBean m = this.partnerDetails.get(client);
+ m.segmentsSent++;
+ m.segmentBytesSent += size;
+ this.partnerDetails.put(client, m);
+ }
+
public String getID() {
return this.identifier;
}
+
+ // helper
+
+ private void cleanUp() {
+ while (this.partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
+ CommunicationPartnerMBean oldestEntry = oldestEntry();
+ if (oldestEntry == null) return;
+ log.info("housekeeping: removing statistics for " + oldestEntry.getName());
+ unregister(oldestEntry);
+ this.partnerDetails.remove(oldestEntry.getName());
+ }
+ }
+
+ private CommunicationPartnerMBean oldestEntry() {
+ CommunicationPartnerMBean ret = null;
+ for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
+ if (ret == null || ret.lastSeen.after(m.lastSeen)) ret = m;
+ }
+ return ret;
+ }
}
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java Fri Oct 17 12:36:13 2014
@@ -27,22 +27,36 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.compression.SnappyFramedDecoder;
import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.Closeable;
+import java.lang.management.ManagementFactory;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.CommunicationObserver;
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.ClientFailoverStatusMBean;
+import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;
import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdDecoder;
import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class FailoverClient implements Runnable, Closeable {
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+import javax.net.ssl.SSLException;
+
+public final class FailoverClient implements ClientFailoverStatusMBean, Runnable, Closeable {
+ public static final String CLIENT_ID_PROPERTY_NAME = "failOverID";
private static final Logger log = LoggerFactory
.getLogger(FailoverClient.class);
@@ -52,70 +66,174 @@ public final class FailoverClient implem
private int readTimeoutMs = 10000;
private final FailoverStore store;
+ private final CommunicationObserver observer;
private FailoverClientHandler handler;
private EventLoopGroup group;
private EventExecutorGroup executor;
+ private SslContext sslContext;
+ private boolean active = false;
+ private boolean running;
+ private int failedRequests;
+ private long lastSuccessfulRequest;
+ private volatile String state;
+ private final Object sync = new Object();
+
+ public FailoverClient(String host, int port, SegmentStore store) throws SSLException {
+ this(host, port, store, false);
+ }
- public FailoverClient(String host, int port, SegmentStore store) {
+ public FailoverClient(String host, int port, SegmentStore store, boolean secure) throws SSLException {
+ this.state = STATUS_INITIALIZING;
+ this.lastSuccessfulRequest = -1;
+ this.failedRequests = 0;
this.host = host;
this.port = port;
+ if (secure) {
+ this.sslContext = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE);
+ }
this.store = new FailoverStore(store);
+ String s = System.getProperty(CLIENT_ID_PROPERTY_NAME);
+ this.observer = new CommunicationObserver((s == null || s.length() == 0) ? UUID.randomUUID().toString() : s);
+
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ jmxServer.registerMBean(new StandardMBean(this, ClientFailoverStatusMBean.class), new ObjectName(this.getMBeanName()));
+ }
+ catch (Exception e) {
+ log.error("can register failover status mbean", e);
+ }
}
- public void run() {
+ public String getMBeanName() {
+ return FailoverStatusMBean.JMX_NAME + ",id=\"" + this.observer.getID() + "\"";
+ }
+
+ public void close() {
+ stop();
+ state = STATUS_CLOSING;
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ jmxServer.unregisterMBean(new ObjectName(this.getMBeanName()));
+ }
+ catch (Exception e) {
+ log.error("can unregister failover status mbean", e);
+ }
+ observer.unregister();
+ if (group != null && !group.isShuttingDown()) {
+ group.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+ .syncUninterruptibly();
+ }
+ if (executor != null && !executor.isShuttingDown()) {
+ executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+ .syncUninterruptibly();
+ }
+ state = STATUS_CLOSED;
+ }
- this.executor = new DefaultEventExecutorGroup(4);
- this.handler = new FailoverClientHandler(this.store, executor);
+ public void run() {
- group = new NioEventLoopGroup();
- Bootstrap b = new Bootstrap();
- b.group(group);
- b.channel(NioSocketChannel.class);
- b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, readTimeoutMs);
- b.option(ChannelOption.TCP_NODELAY, true);
- b.option(ChannelOption.SO_REUSEADDR, true);
- b.option(ChannelOption.SO_KEEPALIVE, true);
-
- b.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline p = ch.pipeline();
-
- // p.addLast(new LoggingHandler(LogLevel.INFO));
- // Enable stream compression
- // p.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
- // p.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
-
- // WriteTimeoutHandler & ReadTimeoutHandler
- p.addLast("readTimeoutHandler", new ReadTimeoutHandler(
- readTimeoutMs, TimeUnit.MILLISECONDS));
-
- p.addLast(new StringEncoder(CharsetUtil.UTF_8));
- p.addLast(new RecordIdDecoder(store));
- p.addLast(executor, handler);
+ Bootstrap b;
+ synchronized (this.sync) {
+ if (this.active) {
+ return;
}
- });
+ state = STATUS_STARTING;
+ executor = new DefaultEventExecutorGroup(4);
+ handler = new FailoverClientHandler(this.store, executor, this.observer);
+ group = new NioEventLoopGroup();
+
+ b = new Bootstrap();
+ b.group(group);
+ b.channel(NioSocketChannel.class);
+ b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, readTimeoutMs);
+ b.option(ChannelOption.TCP_NODELAY, true);
+ b.option(ChannelOption.SO_REUSEADDR, true);
+ b.option(ChannelOption.SO_KEEPALIVE, true);
+
+ b.handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ if (sslContext != null) {
+ p.addLast(sslContext.newHandler(ch.alloc()));
+ }
+ // WriteTimeoutHandler & ReadTimeoutHandler
+ p.addLast("readTimeoutHandler", new ReadTimeoutHandler(
+ readTimeoutMs, TimeUnit.MILLISECONDS));
+ p.addLast(new StringEncoder(CharsetUtil.UTF_8));
+ p.addLast(new SnappyFramedDecoder(true));
+ p.addLast(new RecordIdDecoder(store));
+ p.addLast(executor, handler);
+ }
+ });
+ state = STATUS_RUNNING;
+ this.running = true;
+ this.active = true;
+ }
+
try {
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
+ this.failedRequests = 0;
+ this.lastSuccessfulRequest = System.currentTimeMillis() / 1000;
} catch (Exception e) {
+ this.failedRequests++;
log.error("Failed synchronizing state.", e);
+ stop();
} finally {
- close();
+ synchronized (this.sync) {
+ this.active = false;
+ }
}
}
@Override
- public void close() {
- if (group != null && !group.isShuttingDown()) {
- group.shutdownGracefully(1, 2, TimeUnit.SECONDS)
- .syncUninterruptibly();
- }
- if (executor != null && !executor.isShuttingDown()) {
- executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
- .syncUninterruptibly();
+ public String getMode() {
+ return "client: " + this.observer.getID();
+ }
+
+ @Override
+ public boolean isRunning() { return running;}
+
+ @Override
+ public void start() {
+ if (!running) run();
+ }
+
+ @Override
+ public void stop() {
+ //TODO running flag doesn't make sense this way, since run() is usually scheduled to be called repeatedly.
+ if (running) {
+ running = false;
+ state = STATUS_STOPPED;
}
}
+
+ @Override
+ public String getStatus() {
+ return this.state;
+ }
+
+ @Override
+ public int getFailedRequests() {
+ return this.failedRequests;
+ }
+
+ @Override
+ public int getSecondsSinceLastSuccess() {
+ if (this.lastSuccessfulRequest < 0) return -1;
+ return (int)(System.currentTimeMillis() / 1000 - this.lastSuccessfulRequest);
+ }
+
+ @Override
+ public int calcFailedRequests() {
+ return this.getFailedRequests();
+ }
+
+ @Override
+ public int calcSecondsSinceLastSuccess() {
+ return this.getSecondsSinceLastSuccess();
+ }
}
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java Fri Oct 17 12:36:13 2014
@@ -19,18 +19,17 @@
package org.apache.jackrabbit.oak.plugins.segment.failover.client;
import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.newGetHeadReq;
+
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
+import org.apache.jackrabbit.oak.plugins.segment.failover.CommunicationObserver;
import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdDecoder;
import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentDecoder;
import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
@@ -45,29 +44,31 @@ public class FailoverClientHandler exten
private final FailoverStore store;
private final EventExecutorGroup executor;
+ private final CommunicationObserver observer;
private EventExecutorGroup preloaderExecutor;
private EventExecutorGroup loaderExecutor;
private ChannelHandlerContext ctx;
- private Promise<RecordId> headPromise;
-
public FailoverClientHandler(final FailoverStore store,
- EventExecutorGroup executor) {
+ EventExecutorGroup executor, CommunicationObserver observer) {
this.store = store;
this.executor = executor;
+ this.observer = observer;
}
@Override
- public void channelActive(ChannelHandlerContext ctx) {
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
- sendHeadRequest();
+ log.debug("sending head request");
+ ctx.writeAndFlush(newGetHeadReq(this.observer.getID()));
+ log.debug("did send head request");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RecordId msg)
throws Exception {
- headPromise.setSuccess(msg);
+ setHead(msg);
};
@Override
@@ -75,28 +76,7 @@ public class FailoverClientHandler exten
ctx.flush();
}
- private synchronized void sendHeadRequest() {
- headPromise = ctx.executor().newPromise();
- headPromise.addListener(new GenericFutureListener<Future<RecordId>>() {
- @Override
- public void operationComplete(Future<RecordId> future) {
- if (future.isSuccess()) {
- try {
- setHead(future.get());
- } catch (Exception e) {
- exceptionCaught(ctx, e);
- }
- } else {
- exceptionCaught(ctx, future.cause());
- }
- }
- });
- ctx.writeAndFlush(newGetHeadReq()).addListener(
- new FailedRequestListener(headPromise));
- }
-
synchronized void setHead(RecordId head) {
- headPromise = null;
if (store.getHead().getRecordId().equals(head)) {
// all sync'ed up
@@ -104,6 +84,8 @@ public class FailoverClientHandler exten
ctx.close();
return;
}
+
+ log.debug("updating current head to " + head);
ctx.pipeline().remove(RecordIdDecoder.class);
ctx.pipeline().remove(this);
ctx.pipeline().addLast(new SegmentDecoder(store));
@@ -114,17 +96,12 @@ public class FailoverClientHandler exten
loaderExecutor = new DefaultEventExecutorGroup(4);
SegmentLoaderHandler h2 = new SegmentLoaderHandler(store, head,
- preloaderExecutor, loaderExecutor);
+ preloaderExecutor, loaderExecutor, this.observer.getID());
ctx.pipeline().addLast(loaderExecutor, h2);
h1.channelActive(ctx);
h2.channelActive(ctx);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- log.error("Failed synchronizing state.", cause);
- close();
+ log.debug("updating current head finished");
}
@Override
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java Fri Oct 17 12:36:13 2014
@@ -19,21 +19,21 @@
package org.apache.jackrabbit.oak.plugins.segment.failover.client;
import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.newGetSegmentReq;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.EventExecutorGroup;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentReply;
import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
import org.apache.jackrabbit.oak.plugins.segment.failover.store.RemoteSegmentLoader;
@@ -48,6 +48,7 @@ public class SegmentLoaderHandler extend
.getLogger(SegmentLoaderHandler.class);
private final FailoverStore store;
+ private final String clientID;
private final RecordId head;
private final EventExecutorGroup preloaderExecutor;
private final EventExecutorGroup loaderExecutor;
@@ -60,11 +61,13 @@ public class SegmentLoaderHandler extend
public SegmentLoaderHandler(final FailoverStore store, RecordId head,
EventExecutorGroup preloaderExecutor,
- EventExecutorGroup loaderExecutor) {
+ EventExecutorGroup loaderExecutor,
+ String clientID) {
this.store = store;
this.head = head;
this.preloaderExecutor = preloaderExecutor;
this.loaderExecutor = loaderExecutor;
+ this.clientID = clientID;
}
@Override
@@ -77,12 +80,13 @@ public class SegmentLoaderHandler extend
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof SegmentReply) {
+ //log.debug("offering segment " + ((SegmentReply) evt).getSegment());
segment.offer(((SegmentReply) evt).getSegment());
}
}
private void initSync() {
- log.debug("new head id " + head);
+ log.info("new head id " + head);
long t = System.currentTimeMillis();
try {
@@ -91,38 +95,66 @@ public class SegmentLoaderHandler extend
SegmentNodeBuilder builder = before.builder();
SegmentNodeState current = new SegmentNodeState(head);
- current.compareAgainstBaseState(before, new ApplyDiff(builder));
-
+ do {
+ try {
+ current.compareAgainstBaseState(before, new ApplyDiff(builder));
+ break;
+ }
+ catch (SegmentNotFoundException e) {
+ // the segment is locally damaged or not present anymore
+ // lets try to read this from the master again
+ String id = e.getSegmentId();
+ Segment s = readSegment(e.getSegmentId());
+ if (s == null) {
+ log.warn("can't read locally corrupt segment " + id + " from master");
+ throw e;
+ }
+
+ log.info("did reread locally corrupt segment " + id + " with size " + s.size());
+ ByteArrayOutputStream bout = new ByteArrayOutputStream(s.size());
+ try {
+ s.writeTo(bout);
+ }
+ catch (IOException f) {
+ log.error("can't wrap segment to output stream", f);
+ throw e;
+ }
+ store.writeSegment(s.getSegmentId(), bout.toByteArray(), 0, s.size());
+ }
+ } while(true);
boolean ok = store.setHead(before, builder.getNodeState());
log.info("#updated state (set head {}) in {}ms.", ok,
System.currentTimeMillis() - t);
} finally {
close();
}
+ log.debug("returning initSync");
}
@Override
- public Segment readSegment(final SegmentId id) {
- ctx.writeAndFlush(newGetSegmentReq(id)).addListener(reqListener);
+ public Segment readSegment(final String id) {
+ ctx.writeAndFlush(newGetSegmentReq(this.clientID, id));
return getSegment();
}
- private final ChannelFutureListener reqListener = new ChannelFutureListener() {
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ log.warn("Closing channel. Got exception: " + cause);
+ ctx.close();
+ }
- @Override
- public void operationComplete(ChannelFuture future) {
- if (!future.isSuccess()) {
- exceptionCaught(ctx, future.cause());
- }
- }
- };
+ // implementation of RemoteSegmentLoader
public Segment getSegment() {
boolean interrupted = false;
try {
for (;;) {
try {
- return segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
+ log.debug("polling segment");
+ Segment s = segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
+ log.debug("returning segment " + s);
+ return s;
} catch (InterruptedException ignore) {
interrupted = true;
}
@@ -132,15 +164,9 @@ public class SegmentLoaderHandler extend
Thread.currentThread().interrupt();
}
}
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- log.error("Failed synchronizing state.", cause);
- close();
}
- @Override
public void close() {
ctx.close();
if (preloaderExecutor != null && !preloaderExecutor.isShuttingDown()) {
@@ -153,7 +179,6 @@ public class SegmentLoaderHandler extend
}
}
- @Override
public boolean isClosed() {
return (loaderExecutor != null && (loaderExecutor.isShuttingDown() || loaderExecutor
.isShutdown()));
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java Fri Oct 17 12:36:13 2014
@@ -23,13 +23,18 @@ import io.netty.channel.SimpleChannelInb
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SegmentPreLoaderHandler extends
SimpleChannelInboundHandler<Segment> {
+ private static final Logger log = LoggerFactory
+ .getLogger(SegmentPreLoaderHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Segment msg)
throws Exception {
+ log.info("fire new segment reply for " + msg.getSegmentId());
ctx.fireUserEventTriggered(new SegmentReply(msg));
}
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java Fri Oct 17 12:36:13 2014
@@ -18,22 +18,43 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
-
public class Messages {
public static final byte HEADER_RECORD = 0x00;
public static final byte HEADER_SEGMENT = 0x01;
public static final String GET_HEAD = "h";
-
public static final String GET_SEGMENT = "s.";
- public static String newGetHeadReq() {
- return GET_HEAD + "\r\n";
+ private static final String MAGIC = "FailOver-CMD@";
+ private static final String SEPARATOR = ":";
+
+ private static String newRequest(String clientID, String body) {
+ return MAGIC + (clientID == null ? "" : clientID.replace(SEPARATOR, "#")) + SEPARATOR + body + "\r\n";
+ }
+
+ public static String newGetHeadReq(String clientID) {
+ return newRequest(clientID, GET_HEAD);
+ }
+
+ public static String newGetSegmentReq(String clientID, String sid) {
+ return newRequest(clientID, GET_SEGMENT + sid);
+ }
+
+ public static String extractMessageFrom(String payload) {
+ if (payload.startsWith(MAGIC) && payload.length() > MAGIC.length()) {
+ int i = payload.indexOf(SEPARATOR);
+ return payload.substring(i + 1);
+ }
+ return null;
}
- public static String newGetSegmentReq(SegmentId sid) {
- return GET_SEGMENT + sid.toString() + "\r\n";
+ public static String extractClientFrom(String payload) {
+ if (payload.startsWith(MAGIC) && payload.length() > MAGIC.length()) {
+ payload = payload.substring(MAGIC.length());
+ int i = payload.indexOf(SEPARATOR);
+ return payload.substring(0, i);
+ }
+ return null;
}
}
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java Fri Oct 17 12:36:13 2014
@@ -19,6 +19,8 @@
package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
+import java.io.IOException;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -46,7 +48,7 @@ public class RecordIdDecoder extends Len
throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
- return null;
+ throw new IOException("Received unexpected empty frame. Maybe you have enabled secure transmission on only one endpoint of the connection.");
}
byte type = frame.readByte();
frame.discardReadBytes();
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java Fri Oct 17 12:36:13 2014
@@ -19,25 +19,38 @@
package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
+import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentEncoder.EXTRA_HEADERS_LEN;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
public class SegmentDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory
.getLogger(SegmentDecoder.class);
+ /**
+ * the maximum possible size a header message might have
+ */
+ private static final int MAX_LENGHT = Segment.MAX_SEGMENT_SIZE
+ + EXTRA_HEADERS_LEN;
+
private final SegmentStore store;
public SegmentDecoder(SegmentStore store) {
- super(Segment.MAX_SEGMENT_SIZE + 21, 0, 4, 0, 4);
+ super(MAX_LENGHT, 0, 4, 0, 0);
this.store = store;
}
@@ -48,14 +61,26 @@ public class SegmentDecoder extends Leng
if (frame == null) {
return null;
}
+ int len = frame.readInt();
byte type = frame.readByte();
long msb = frame.readLong();
long lsb = frame.readLong();
- frame.discardReadBytes();
- SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
- Segment s = new Segment(store.getTracker(), id, frame.nioBuffer());
- log.debug("received type {} with id {} and size {}", type, id, s.size());
- return s;
+ long hash = frame.readLong();
+ byte[] segment = new byte[len - 25];
+ frame.getBytes(29, segment);
+ Hasher hasher = Hashing.murmur3_32().newHasher();
+ long check = hasher.putBytes(segment).hash().padToLong();
+ if (hash == check) {
+ SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
+ Segment s = new Segment(store.getTracker(), id,
+ ByteBuffer.wrap(segment));
+ log.debug("received type {} with id {} and size {}", type, id,
+ s.size());
+ return s;
+ }
+ log.debug("received corrupted segment {}, ignoring", new UUID(msb, lsb));
+ return null;
+
}
@Override
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java?rev=1632554&r1=1606087&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java Fri Oct 17 12:36:13 2014
@@ -20,27 +20,54 @@
package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
+import java.io.ByteArrayOutputStream;
+
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
public class SegmentEncoder extends MessageToByteEncoder<Segment> {
+ /**
+ * A segment message is composed of:
+ *
+ * <pre>
+ * - (4 bytes) the message length
+ * - (1 byte ) a message type (not currently used)
+ * - (8 bytes) segment id most significant bits
+ * - (8 bytes) segment id least significant bits
+ * - (8 bytes) checksum hash
+ * </pre>
+ */
+ static int EXTRA_HEADERS_LEN = 29;
+
+ /**
+ * the header size not including the length
+ */
+ private int EXTRA_HEADERS_WO_SIZE = EXTRA_HEADERS_LEN - 4;
+
@Override
protected void encode(ChannelHandlerContext ctx, Segment s, ByteBuf out)
throws Exception {
SegmentId id = s.getSegmentId();
- int len = s.size() + 17;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(s.size());
+ s.writeTo(baos);
+ byte[] segment = baos.toByteArray();
+
+ Hasher hasher = Hashing.murmur3_32().newHasher();
+ long hash = hasher.putBytes(segment).hash().padToLong();
+
+ int len = segment.length + EXTRA_HEADERS_WO_SIZE;
out.writeInt(len);
out.writeByte(Messages.HEADER_SEGMENT);
out.writeLong(id.getMostSignificantBits());
out.writeLong(id.getLeastSignificantBits());
- ByteBufOutputStream bout = new ByteBufOutputStream(out);
- s.writeTo(bout);
- bout.flush();
- bout.close();
+ out.writeLong(hash);
+ out.writeBytes(segment);
}
}
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java?rev=1632554&r1=1620634&r2=1632554&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java Fri Oct 17 12:36:13 2014
@@ -39,4 +39,10 @@ public interface ObservablePartnerMBean
@CheckForNull
@Description("Time the remote instance was last contacted")
String getLastSeenTimestamp();
+
+ @Description("Number of transferred segments")
+ long getTransferredSegments();
+
+ @Description("Number of bytes stored in transferred segments")
+ long getTransferredSegmentBytes();
}