You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2014/12/11 17:06:20 UTC
svn commit: r1644689 [1/2] - in /jackrabbit/oak/trunk/oak-tarmk-standby: ./
src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/
src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/
src/main/java/org/apache/jackrabb...
Author: alexparvulescu
Date: Thu Dec 11 16:06:19 2014
New Revision: 1644689
URL: http://svn.apache.org/r1644689
Log:
OAK-2347 TarMK Cold Standby FSDS mirroring
Added:
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java (with props)
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java (with props)
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java (with props)
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java (with props)
jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java (with props)
jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java (with props)
Removed:
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentPreLoaderHandler.java
Modified:
jackrabbit/oak/trunk/oak-tarmk-standby/pom.xml
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java (contents, props changed)
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java (contents, props changed)
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java (contents, props changed)
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverMultipleClientsTest.java (contents, props changed)
jackrabbit/oak/trunk/oak-tarmk-standby/src/test/resources/logback-test.xml
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/pom.xml?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/pom.xml Thu Dec 11 16:06:19 2014
@@ -141,6 +141,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-blob</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>io.netty</groupId>
@@ -202,5 +208,11 @@
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-data</artifactId>
+ <version>2.9.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java Thu Dec 11 16:06:19 2014
@@ -18,6 +18,7 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.standby.client;
+import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetBlobReq;
import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetSegmentReq;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -28,16 +29,17 @@ import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentReply;
-import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
import org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader;
-import org.apache.jackrabbit.oak.spi.state.ApplyDiff;
+import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,24 +52,23 @@ public class SegmentLoaderHandler extend
private final StandbyStore store;
private final String clientID;
private final RecordId head;
- private final EventExecutorGroup preloaderExecutor;
private final EventExecutorGroup loaderExecutor;
+ private final AtomicBoolean running;
- private int timeoutMs = 5000;
+ private int timeoutMs = 120000;
private ChannelHandlerContext ctx;
- final BlockingQueue<Segment> segment = new LinkedBlockingQueue<Segment>();
+ final BlockingQueue<SegmentReply> segment = new LinkedBlockingQueue<SegmentReply>();
public SegmentLoaderHandler(final StandbyStore store, RecordId head,
- EventExecutorGroup preloaderExecutor,
EventExecutorGroup loaderExecutor,
- String clientID) {
+ String clientID, AtomicBoolean running) {
this.store = store;
this.head = head;
- this.preloaderExecutor = preloaderExecutor;
this.loaderExecutor = loaderExecutor;
this.clientID = clientID;
+ this.running = running;
}
@Override
@@ -80,8 +81,7 @@ public class SegmentLoaderHandler extend
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof SegmentReply) {
- //log.debug("offering segment " + ((SegmentReply) evt).getSegment());
- segment.offer(((SegmentReply) evt).getSegment());
+ segment.offer((SegmentReply) evt);
}
}
@@ -97,10 +97,10 @@ public class SegmentLoaderHandler extend
SegmentNodeState current = new SegmentNodeState(head);
do {
try {
- current.compareAgainstBaseState(before, new ApplyDiff(builder));
+ current.compareAgainstBaseState(before,
+ new StandbyApplyDiff(builder, store, this));
break;
- }
- catch (SegmentNotFoundException e) {
+ } catch (SegmentNotFoundException e) {
// the segment is locally damaged or not present anymore
// lets try to read this from the primary again
String id = e.getSegmentId();
@@ -114,8 +114,7 @@ public class SegmentLoaderHandler extend
ByteArrayOutputStream bout = new ByteArrayOutputStream(s.size());
try {
s.writeTo(bout);
- }
- catch (IOException f) {
+ } catch (IOException f) {
log.error("can't wrap segment to output stream", f);
throw e;
}
@@ -137,25 +136,50 @@ public class SegmentLoaderHandler extend
}
@Override
+ public Blob readBlob(String blobId) {
+ ctx.writeAndFlush(newGetBlobReq(this.clientID, blobId));
+ return getBlob(blobId);
+ }
+
+ @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- log.warn("Closing channel. Got exception: " + cause);
- ctx.close();
+ log.error("Exception caught, closing channel.", cause);
+ close();
+ }
+
+ private Segment getSegment(final String id) {
+ return getReply(id, SegmentReply.SEGMENT).getSegment();
}
- // implementation of RemoteSegmentLoader
+ private Blob getBlob(final String id) {
+ return getReply(id, SegmentReply.BLOB).getBlob();
+ }
- public Segment getSegment(final String id) {
+ private SegmentReply getReply(final String id, int type) {
boolean interrupted = false;
try {
for (;;) {
try {
- Segment s = segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
- if (s == null) {
- return null;
+ SegmentReply r = segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
+ if (r == null) {
+ log.warn("timeout waiting for {}", id);
+ return SegmentReply.empty();
}
- if (s.getSegmentId().toString().equals(id)) {
- return s;
+ if (r.getType() == type) {
+ switch (r.getType()) {
+ case SegmentReply.SEGMENT:
+ if (r.getSegment().getSegmentId().toString()
+ .equals(id)) {
+ return r;
+ }
+ break;
+ case SegmentReply.BLOB:
+ if (r.getBlob().getBlobId().equals(id)) {
+ return r;
+ }
+ break;
+ }
}
} catch (InterruptedException ignore) {
interrupted = true;
@@ -166,24 +190,26 @@ public class SegmentLoaderHandler extend
Thread.currentThread().interrupt();
}
}
-
}
+ @Override
public void close() {
ctx.close();
- if (preloaderExecutor != null && !preloaderExecutor.isShuttingDown()) {
- preloaderExecutor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
- .syncUninterruptibly();
- }
if (loaderExecutor != null && !loaderExecutor.isShuttingDown()) {
loaderExecutor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
.syncUninterruptibly();
}
}
+ @Override
public boolean isClosed() {
return (loaderExecutor != null && (loaderExecutor.isShuttingDown() || loaderExecutor
.isShutdown()));
}
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
}
Added: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java?rev=1644689&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java Thu Dec 11 16:06:19 2014
@@ -0,0 +1,156 @@
+package org.apache.jackrabbit.oak.plugins.segment.standby.client;
+
+import static org.apache.jackrabbit.oak.api.Type.BINARIES;
+import static org.apache.jackrabbit.oak.api.Type.BINARY;
+
+import java.io.IOException;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentBlob;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class StandbyApplyDiff implements NodeStateDiff {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(StandbyApplyDiff.class);
+
+ private final NodeBuilder builder;
+
+ private final SegmentStore store;
+
+ private final RemoteSegmentLoader loader;
+
+ private final String path;
+
+ public StandbyApplyDiff(NodeBuilder builder, SegmentStore store,
+ RemoteSegmentLoader loader) {
+ this(builder, store, loader, "/");
+ }
+
+ private StandbyApplyDiff(NodeBuilder builder, SegmentStore store,
+ RemoteSegmentLoader loader, String path) {
+ this.builder = builder;
+ this.store = store;
+ this.loader = loader;
+ this.path = path;
+ if (log.isTraceEnabled()) {
+ if (PathUtils.getDepth(path) < 5) {
+ log.trace("running diff on {}", path);
+ }
+ }
+ }
+
+ @Override
+ public boolean propertyAdded(PropertyState after) {
+ if (!loader.isRunning()) {
+ return false;
+ }
+ builder.setProperty(binaryCheck(after));
+ return true;
+ }
+
+ @Override
+ public boolean propertyChanged(PropertyState before, PropertyState after) {
+ if (!loader.isRunning()) {
+ return false;
+ }
+ builder.setProperty(binaryCheck(after));
+ return true;
+ }
+
+ @Override
+ public boolean propertyDeleted(PropertyState before) {
+ if (!loader.isRunning()) {
+ return false;
+ }
+ builder.removeProperty(before.getName());
+ return true;
+ }
+
+ private PropertyState binaryCheck(PropertyState property) {
+ Type<?> type = property.getType();
+ if (type == BINARY) {
+ binaryCheck(property.getValue(Type.BINARY), property.getName());
+ } else if (type == BINARIES) {
+ for (Blob blob : property.getValue(BINARIES)) {
+ binaryCheck(blob, property.getName());
+ }
+ }
+ return property;
+ }
+
+ private void binaryCheck(Blob b, String pName) {
+ if (b instanceof SegmentBlob) {
+ SegmentBlob sb = (SegmentBlob) b;
+ // verify if the blob exists
+ if (sb.isExternal() && b.getReference() == null) {
+ String blobId = sb.getBlobId();
+ if (blobId != null) {
+ readBlob(blobId, pName);
+ }
+ }
+ } else {
+ log.warn("Unknown Blob {} at {}, ignoring", b.getClass().getName(),
+ path + "#" + pName);
+ }
+ }
+
+ private void readBlob(String blobId, String pName) {
+ Blob read = loader.readBlob(blobId);
+ if (read != null) {
+ try {
+ store.getBlobStore().writeBlob(read.getNewStream());
+ } catch (IOException f) {
+ throw new IllegalStateException("Unable to persist blob "
+ + blobId + " at " + path + "#" + pName, f);
+ }
+ } else {
+ throw new IllegalStateException("Unable to load remote blob "
+ + blobId + " at " + path + "#" + pName);
+ }
+ }
+
+ @Override
+ public boolean childNodeAdded(String name, NodeState after) {
+ if (!loader.isRunning()) {
+ return false;
+ }
+ NodeBuilder child = EmptyNodeState.EMPTY_NODE.builder();
+ boolean success = EmptyNodeState.compareAgainstEmptyState(after,
+ new StandbyApplyDiff(child, store, loader, path + name + "/"));
+ if (success) {
+ builder.setChildNode(name, child.getNodeState());
+ }
+ return success;
+ }
+
+ @Override
+ public boolean childNodeChanged(String name, NodeState before,
+ NodeState after) {
+ if (!loader.isRunning()) {
+ return false;
+ }
+
+ return after.compareAgainstBaseState(before, new StandbyApplyDiff(
+ builder.getChildNode(name), store, loader, path + name + "/"));
+ }
+
+ @Override
+ public boolean childNodeDeleted(String name, NodeState before) {
+ if (!loader.isRunning()) {
+ return false;
+ }
+ builder.getChildNode(name).remove();
+ return true;
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java Thu Dec 11 16:06:19 2014
@@ -40,6 +40,7 @@ import java.io.Closeable;
import java.lang.management.ManagementFactory;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
@@ -72,12 +73,13 @@ public final class StandbyClient impleme
private EventExecutorGroup executor;
private SslContext sslContext;
private boolean active = false;
- private boolean running;
private int failedRequests;
private long lastSuccessfulRequest;
private volatile String state;
private final Object sync = new Object();
+ private final AtomicBoolean running = new AtomicBoolean(true);
+
public StandbyClient(String host, int port, SegmentStore store) throws SSLException {
this(host, port, store, false);
}
@@ -124,6 +126,10 @@ public final class StandbyClient impleme
}
public void run() {
+ if (!isRunning()) {
+ // manually stopped
+ return;
+ }
Bootstrap b;
synchronized (this.sync) {
@@ -132,7 +138,7 @@ public final class StandbyClient impleme
}
state = STATUS_STARTING;
executor = new DefaultEventExecutorGroup(4);
- handler = new StandbyClientHandler(this.store, executor, this.observer);
+ handler = new StandbyClientHandler(this.store, executor, this.observer, this.running);
group = new NioEventLoopGroup();
b = new Bootstrap();
@@ -160,7 +166,6 @@ public final class StandbyClient impleme
}
});
state = STATUS_RUNNING;
- this.running = true;
this.active = true;
}
@@ -200,20 +205,19 @@ public final class StandbyClient impleme
}
@Override
- public boolean isRunning() { return running;}
+ public boolean isRunning() {
+ return running.get();
+ }
@Override
public void start() {
- if (!running) run();
+ running.set(true);
}
@Override
public void stop() {
- //TODO running flag doesn't make sense this way, since run() is usually scheduled to be called repeatedly.
- if (running) {
- running = false;
- state = STATUS_STOPPED;
- }
+ running.set(false);
+ state = STATUS_STOPPED;
}
@Override
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java Thu Dec 11 16:06:19 2014
@@ -26,17 +26,18 @@ import io.netty.util.concurrent.EventExe
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
-import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentDecoder;
+import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder;
import org.apache.jackrabbit.oak.plugins.segment.standby.store.CommunicationObserver;
import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StandbyClientHandler extends
- SimpleChannelInboundHandler<RecordId> implements Closeable {
+public class StandbyClientHandler extends SimpleChannelInboundHandler<RecordId>
+ implements Closeable {
private static final Logger log = LoggerFactory
.getLogger(StandbyClientHandler.class);
@@ -44,16 +45,18 @@ public class StandbyClientHandler extend
private final StandbyStore store;
private final EventExecutorGroup executor;
private final CommunicationObserver observer;
- private EventExecutorGroup preloaderExecutor;
- private EventExecutorGroup loaderExecutor;
+ private final AtomicBoolean running;
+ private EventExecutorGroup loaderExecutor;
private ChannelHandlerContext ctx;
public StandbyClientHandler(final StandbyStore store,
- EventExecutorGroup executor, CommunicationObserver observer) {
+ EventExecutorGroup executor, CommunicationObserver observer,
+ AtomicBoolean running) {
this.store = store;
this.executor = executor;
this.observer = observer;
+ this.running = running;
}
@Override
@@ -87,33 +90,31 @@ public class StandbyClientHandler extend
log.debug("updating current head to " + head);
ctx.pipeline().remove(RecordIdDecoder.class);
ctx.pipeline().remove(this);
- ctx.pipeline().addLast(new SegmentDecoder(store));
-
- preloaderExecutor = new DefaultEventExecutorGroup(4);
- SegmentPreLoaderHandler h1 = new SegmentPreLoaderHandler();
- ctx.pipeline().addLast(preloaderExecutor, h1);
+ ctx.pipeline().addLast(new ReplyDecoder(store));
loaderExecutor = new DefaultEventExecutorGroup(4);
SegmentLoaderHandler h2 = new SegmentLoaderHandler(store, head,
- preloaderExecutor, loaderExecutor, this.observer.getID());
+ loaderExecutor, this.observer.getID(), running);
ctx.pipeline().addLast(loaderExecutor, h2);
- h1.channelActive(ctx);
h2.channelActive(ctx);
log.debug("updating current head finished");
}
@Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ log.error("Exception caught, closing channel.", cause);
+ close();
+ }
+
+ @Override
public void close() {
ctx.close();
if (!executor.isShuttingDown()) {
executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
.syncUninterruptibly();
}
- if (preloaderExecutor != null && !preloaderExecutor.isShuttingDown()) {
- preloaderExecutor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
- .syncUninterruptibly();
- }
if (loaderExecutor != null && !loaderExecutor.isShuttingDown()) {
loaderExecutor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
.syncUninterruptibly();
Added: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java?rev=1644689&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java Thu Dec 11 16:06:19 2014
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.oak.api.Blob;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+public class BlobEncoder extends MessageToByteEncoder<Blob> {
+
+ // TODO
+ // if transferring large binaries turns out to be too intensive look into
+ // using a ChunkedWriteHandler and a new ChunkedStream(Blob.getNewStream())
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, Blob b, ByteBuf out)
+ throws Exception {
+ byte[] bytes = null;
+ InputStream s = b.getNewStream();
+ try {
+ bytes = IOUtils.toByteArray(s);
+ } finally {
+ s.close();
+ }
+
+ Hasher hasher = Hashing.murmur3_32().newHasher();
+ long hash = hasher.putBytes(bytes).hash().padToLong();
+
+ out.writeInt(bytes.length);
+ out.writeByte(Messages.HEADER_BLOB);
+
+ String bid = b.getContentIdentity();
+ byte[] id = bid.getBytes(Charset.forName("UTF-8"));
+ out.writeInt(id.length);
+ out.writeBytes(id);
+
+ out.writeLong(hash);
+ out.writeBytes(bytes);
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java?rev=1644689&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java Thu Dec 11 16:06:19 2014
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+
+import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
+
+public class IdArrayBasedBlob extends ArrayBasedBlob {
+
+ private final String blobId;
+
+ public IdArrayBasedBlob(byte[] value, String blobId) {
+ super(value);
+ this.blobId = blobId;
+ }
+
+ public String getBlobId() {
+ return blobId;
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java Thu Dec 11 16:06:19 2014
@@ -22,9 +22,11 @@ public class Messages {
public static final byte HEADER_RECORD = 0x00;
public static final byte HEADER_SEGMENT = 0x01;
+ public static final byte HEADER_BLOB = 0x02;
public static final String GET_HEAD = "h";
public static final String GET_SEGMENT = "s.";
+ public static final String GET_BLOB = "b.";
private static final String MAGIC = "Standby-CMD@";
private static final String SEPARATOR = ":";
@@ -41,6 +43,10 @@ public class Messages {
return newRequest(clientID, GET_SEGMENT + sid);
}
+ public static String newGetBlobReq(String clientID, String blobId) {
+ return newRequest(clientID, GET_BLOB + blobId);
+ }
+
public static String extractMessageFrom(String payload) {
if (payload.startsWith(MAGIC) && payload.length() > MAGIC.length()) {
int i = payload.indexOf(SEPARATOR);
Added: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java?rev=1644689&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java Thu Dec 11 16:06:19 2014
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder.DecodingState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+public class ReplyDecoder extends ReplayingDecoder<DecodingState> {
+
+ public enum DecodingState {
+ HEADER, SEGMENT, BLOB
+ }
+
+ private static final Logger log = LoggerFactory
+ .getLogger(ReplyDecoder.class);
+
+ private final SegmentStore store;
+
+ private int length = -1;
+ private byte type = -1;
+
+ public ReplyDecoder(SegmentStore store) {
+ super(DecodingState.HEADER);
+ this.store = store;
+ }
+
+ private void reset() {
+ checkpoint(DecodingState.HEADER);
+ length = -1;
+ type = -1;
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in,
+ List<Object> out) throws Exception {
+
+ switch (state()) {
+ case HEADER: {
+ length = in.readInt();
+ type = in.readByte();
+ switch (type) {
+ case Messages.HEADER_SEGMENT:
+ checkpoint(DecodingState.SEGMENT);
+ break;
+ case Messages.HEADER_BLOB:
+ checkpoint(DecodingState.BLOB);
+ break;
+ default:
+ throw new Exception("Unknown type: " + type);
+ }
+ return;
+ }
+
+ case SEGMENT: {
+ Segment s = decodeSegment(in, length, type);
+ if (s != null) {
+ out.add(SegmentReply.empty());
+ ctx.fireUserEventTriggered(new SegmentReply(s));
+ reset();
+ }
+ return;
+ }
+
+ case BLOB: {
+ IdArrayBasedBlob b = decodeBlob(in, length, type);
+ if (b != null) {
+ out.add(SegmentReply.empty());
+ ctx.fireUserEventTriggered(new SegmentReply(b));
+ reset();
+ }
+ return;
+ }
+
+ default:
+ throw new Exception("Unknown decoding state: " + state());
+ }
+ }
+
+ private Segment decodeSegment(ByteBuf in, int len, byte type) {
+ long msb = in.readLong();
+ long lsb = in.readLong();
+ long hash = in.readLong();
+ byte[] segment = new byte[len - 25];
+ in.readBytes(segment);
+ Hasher hasher = Hashing.murmur3_32().newHasher();
+ long check = hasher.putBytes(segment).hash().padToLong();
+ if (hash == check) {
+ SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
+ Segment s = new Segment(store.getTracker(), id,
+ ByteBuffer.wrap(segment));
+ log.debug("received segment with id {} and size {}", id, s.size());
+ return s;
+ }
+ log.debug("received corrupted segment {}, ignoring", new UUID(msb, lsb));
+ return null;
+ }
+
+ private IdArrayBasedBlob decodeBlob(ByteBuf in, int length, byte type) {
+ int inIdLen = in.readInt();
+ byte[] bid = new byte[inIdLen];
+ in.readBytes(bid);
+ String id = new String(bid, Charset.forName("UTF-8"));
+
+ long hash = in.readLong();
+ byte[] blob = new byte[length];
+ in.readBytes(blob);
+ Hasher hasher = Hashing.murmur3_32().newHasher();
+ long check = hasher.putBytes(blob).hash().padToLong();
+ if (hash == check) {
+ log.debug("received blob with id {} and size {}", id, blob.length);
+ return new IdArrayBasedBlob(blob, id);
+ }
+ log.debug("received corrupted binary {}, ignoring", id);
+ return null;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java Thu Dec 11 16:06:19 2014
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
+@Deprecated
public class SegmentDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java Thu Dec 11 16:06:19 2014
@@ -20,14 +20,47 @@ import org.apache.jackrabbit.oak.plugins
public class SegmentReply {
+ public static final int SEGMENT = 0;
+ public static final int BLOB = 1;
+
+ public static SegmentReply empty() {
+ return new SegmentReply();
+ }
+
+ private final int type;
+
private final Segment segment;
+ private final IdArrayBasedBlob blob;
+
public SegmentReply(Segment segment) {
+ this.type = SEGMENT;
this.segment = segment;
+ this.blob = null;
+ }
+
+ public SegmentReply(IdArrayBasedBlob blob) {
+ this.type = BLOB;
+ this.segment = null;
+ this.blob = blob;
+ }
+
+ private SegmentReply() {
+ this.type = -1;
+ this.segment = null;
+ this.blob = null;
}
public Segment getSegment() {
return this.segment;
}
+ public IdArrayBasedBlob getBlob() {
+ return blob;
+ }
+
+ public int getType() {
+ return type;
+ }
+
}
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java Thu Dec 11 16:06:19 2014
@@ -1,48 +1,55 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
-
-import org.apache.jackrabbit.oak.commons.jmx.Description;
-
-import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
-
-public interface ObservablePartnerMBean {
-
- @Nonnull
- @Description("name of the partner")
- String getName();
-
- @Description("IP of the remote")
- String getRemoteAddress();
-
- @Description("Last request")
- String getLastRequest();
-
- @Description("Port of the remote")
- int getRemotePort();
-
- @CheckForNull
- @Description("Time the remote instance was last contacted")
- String getLastSeenTimestamp();
-
- @Description("Number of transferred segments")
- long getTransferredSegments();
-
- @Description("Number of bytes stored in transferred segments")
- long getTransferredSegmentBytes();
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
+
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+public interface ObservablePartnerMBean {
+
+ @Nonnull
+ @Description("name of the partner")
+ String getName();
+
+ @Description("IP of the remote")
+ String getRemoteAddress();
+
+ @Description("Last request")
+ String getLastRequest();
+
+ @Description("Port of the remote")
+ int getRemotePort();
+
+ @CheckForNull
+ @Description("Time the remote instance was last contacted")
+ String getLastSeenTimestamp();
+
+ @Description("Number of transferred segments")
+ long getTransferredSegments();
+
+ @Description("Number of bytes stored in transferred segments")
+ long getTransferredSegmentBytes();
+
+ @Description("Number of transferred binaries")
+ long getTransferredBinaries();
+
+ @Description("Number of bytes stored in transferred binaries")
+ long getTransferredBinariesBytes();
+
+}
Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java Thu Dec 11 16:06:19 2014
@@ -1,47 +1,47 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
-
-import org.apache.jackrabbit.oak.commons.jmx.Description;
-import javax.annotation.Nonnull;
-
-public interface StandbyStatusMBean {
- public static final String JMX_NAME = "org.apache.jackrabbit.oak:name=Status,type=\"Standby\"";
- public static final String STATUS_INITIALIZING = "initializing";
- public static final String STATUS_STOPPED = "stopped";
- public static final String STATUS_STARTING = "starting";
- public static final String STATUS_RUNNING = "running";
- public static final String STATUS_CLOSING = "closing";
- public static final String STATUS_CLOSED = "closed";
-
- @Nonnull
- @Description("primary or standby")
- String getMode();
-
- @Description("current status of the service")
- String getStatus();
-
- @Description("instance is running")
- boolean isRunning();
-
- @Description("stop the communication")
- void stop();
-
- @Description("start the communication")
- void start();
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
+
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+import javax.annotation.Nonnull;
+
+public interface StandbyStatusMBean {
+ public static final String JMX_NAME = "org.apache.jackrabbit.oak:name=Status,type=\"Standby\"";
+ public static final String STATUS_INITIALIZING = "initializing";
+ public static final String STATUS_STOPPED = "stopped";
+ public static final String STATUS_STARTING = "starting";
+ public static final String STATUS_RUNNING = "running";
+ public static final String STATUS_CLOSING = "closing";
+ public static final String STATUS_CLOSED = "closed";
+
+ @Nonnull
+ @Description("primary or standby")
+ String getMode();
+
+ @Description("current status of the service")
+ String getStatus();
+
+ @Description("instance is running")
+ boolean isRunning();
+
+ @Description("stop the communication")
+ void stop();
+
+ @Description("start the communication")
+ void start();
+}
Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java Thu Dec 11 16:06:19 2014
@@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.plugin
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -33,15 +34,15 @@ import io.netty.handler.codec.string.Str
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.Future;
import java.io.Closeable;
import java.lang.management.ManagementFactory;
import java.security.cert.CertificateException;
import java.util.concurrent.TimeUnit;
-import io.netty.util.concurrent.Future;
-
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.standby.codec.BlobEncoder;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdEncoder;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentEncoder;
import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean;
@@ -129,6 +130,7 @@ public class StandbyServer implements St
p.addLast(new SnappyFramedEncoder());
p.addLast(new RecordIdEncoder());
p.addLast(new SegmentEncoder());
+ p.addLast(new BlobEncoder());
p.addLast(handler);
}
});
@@ -169,12 +171,31 @@ public class StandbyServer implements St
public void run() {
try {
running = true;
+ handler.state = STATUS_RUNNING;
channelFuture.sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
StandbyServer.this.stop();
}
}
};
+ final ChannelFutureListener bindListener = new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isSuccess()) {
+ close.start();
+ } else {
+ log.error("Server failed to start, will be canceled",
+ future.cause());
+ future.channel().close();
+ new Thread() {
+ @Override
+ public void run() {
+ close();
+ }
+ }.start();
+ }
+ }
+ };
Future<?> startup = bossGroup.submit(new Runnable() {
@Override
public void run() {
@@ -184,7 +205,7 @@ public class StandbyServer implements St
//the channel registration synchronous.
//Note that now this method will return immediately.
channelFuture = b.bind(port);
- close.start();
+ channelFuture.addListener(bindListener);
}
});
if (!startup.awaitUninterruptibly(10000)) {
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java Thu Dec 11 16:06:19 2014
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelHandler.S
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
@@ -158,7 +159,7 @@ public class StandbyServerHandler extend
} catch (IllegalStateException e) {
// segment not found
log.debug("waiting for segment. Got exception: " + e.getMessage());
- TimeUnit.MILLISECONDS.sleep(1000);
+ TimeUnit.MILLISECONDS.sleep(2000);
}
if (s != null) break;
}
@@ -169,6 +170,15 @@ public class StandbyServerHandler extend
observer.didSendSegmentBytes(clientID, s.size());
return;
}
+ } else if (request.startsWith(Messages.GET_BLOB)) {
+ String bid = request.substring(Messages.GET_BLOB.length());
+ log.debug("request blob id {}", bid);
+ Blob b = store.readBlob(bid);
+ log.debug("sending blob " + bid + " to " + client);
+ ctx.writeAndFlush(b);
+ observer.didSendBinariesBytes(clientID,
+ Math.max(0, (int) b.length()));
+ return;
} else {
log.warn("Unknown request {}, ignoring.", request);
}
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java Thu Dec 11 16:06:19 2014
@@ -1,179 +1,199 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.jackrabbit.oak.plugins.segment.standby.store;
-
-
-import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean;
-import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ObservablePartnerMBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-public class CommunicationObserver {
- private static final int MAX_CLIENT_STATISTICS = 10;
-
- private class CommunicationPartnerMBean implements ObservablePartnerMBean {
- private final ObjectName mbeanName;
- private final String clientName;
- public String lastRequest;
- public Date lastSeen;
- public String remoteAddress;
- public int remotePort;
- public long segmentsSent;
- public long segmentBytesSent;
-
- public CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException {
- this.clientName = clientName;
- this.mbeanName = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=\"Client " + clientName + "\"");
- }
-
- public ObjectName getMBeanName() {
- return this.mbeanName;
- }
-
- @Override
- public String getName() {
- return this.clientName;
- }
-
- @Override
- public String getRemoteAddress() {
- return this.remoteAddress;
- }
-
- @Override
- public String getLastRequest() {
- return this.lastRequest;
- }
-
- @Override
- public int getRemotePort() {
- return this.remotePort;
- }
-
- @Override
- public String getLastSeenTimestamp() {
- return this.lastSeen == null ? null : this.lastSeen.toString();
- }
-
- @Override
- public long getTransferredSegments() {
- return this.segmentsSent;
- }
-
- @Override
- public long getTransferredSegmentBytes() {
- return this.segmentBytesSent;
- }
- }
-
- private static final Logger log = LoggerFactory
- .getLogger(CommunicationObserver.class);
-
- private final String identifier;
- private final Map<String, CommunicationPartnerMBean> partnerDetails;
-
- public CommunicationObserver(String myID) {
- this.identifier = myID;
- this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>();
- }
-
- private void unregister(CommunicationPartnerMBean m) {
- final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
- try {
- jmxServer.unregisterMBean(m.getMBeanName());
- }
- catch (Exception e) {
- log.error("error unregistering mbean for client '" + m.getName() + "'", e);
- }
- }
-
- public void unregister() {
- for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
- unregister(m);
- }
- }
-
- public void gotMessageFrom(String client, String request, InetSocketAddress remote) throws MalformedObjectNameException {
- log.debug("got message '" + request + "' from client " + client);
- CommunicationPartnerMBean m = this.partnerDetails.get(client);
- boolean register = false;
- if (m == null) {
- cleanUp();
- m = new CommunicationPartnerMBean(client);
- m.remoteAddress = remote.getAddress().getHostAddress();
- m.remotePort = remote.getPort();
- register = true;
- }
- m.lastSeen = new Date();
- m.lastRequest = request;
- this.partnerDetails.put(client, m);
- if (register) {
- final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
- try {
- jmxServer.registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), m.getMBeanName());
- }
- catch (Exception e) {
- log.error("can register mbean for client '" + m.getName() + "'", e);
- }
- }
- }
-
- public void didSendSegmentBytes(String client, int size) {
- log.debug("did send segment with " + size + " bytes to client " + client);
- CommunicationPartnerMBean m = this.partnerDetails.get(client);
- m.segmentsSent++;
- m.segmentBytesSent += size;
- this.partnerDetails.put(client, m);
- }
-
- public String getID() {
- return this.identifier;
- }
-
- // helper
-
- private void cleanUp() {
- while (this.partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
- CommunicationPartnerMBean oldestEntry = oldestEntry();
- if (oldestEntry == null) return;
- log.info("housekeeping: removing statistics for " + oldestEntry.getName());
- unregister(oldestEntry);
- this.partnerDetails.remove(oldestEntry.getName());
- }
- }
-
- private CommunicationPartnerMBean oldestEntry() {
- CommunicationPartnerMBean ret = null;
- for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
- if (ret == null || ret.lastSeen.after(m.lastSeen)) ret = m;
- }
- return ret;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.standby.store;
+
+
+import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean;
+import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ObservablePartnerMBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CommunicationObserver {
+ private static final int MAX_CLIENT_STATISTICS = 10;
+
+ private class CommunicationPartnerMBean implements ObservablePartnerMBean {
+ private final ObjectName mbeanName;
+ private final String clientName;
+ public String lastRequest;
+ public Date lastSeen;
+ public String remoteAddress;
+ public int remotePort;
+ public long segmentsSent;
+ public long segmentBytesSent;
+ public long binariesSent;
+ public long binariesBytesSent;
+
+ public CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException {
+ this.clientName = clientName;
+ this.mbeanName = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=\"Client " + clientName + "\"");
+ }
+
+ public ObjectName getMBeanName() {
+ return this.mbeanName;
+ }
+
+ @Override
+ public String getName() {
+ return this.clientName;
+ }
+
+ @Override
+ public String getRemoteAddress() {
+ return this.remoteAddress;
+ }
+
+ @Override
+ public String getLastRequest() {
+ return this.lastRequest;
+ }
+
+ @Override
+ public int getRemotePort() {
+ return this.remotePort;
+ }
+
+ @Override
+ public String getLastSeenTimestamp() {
+ return this.lastSeen == null ? null : this.lastSeen.toString();
+ }
+
+ @Override
+ public long getTransferredSegments() {
+ return this.segmentsSent;
+ }
+
+ @Override
+ public long getTransferredSegmentBytes() {
+ return this.segmentBytesSent;
+ }
+
+ @Override
+ public long getTransferredBinaries() {
+ return this.binariesSent;
+ }
+
+ @Override
+ public long getTransferredBinariesBytes() {
+ return this.binariesBytesSent;
+ }
+ }
+
+ private static final Logger log = LoggerFactory
+ .getLogger(CommunicationObserver.class);
+
+ private final String identifier;
+ private final Map<String, CommunicationPartnerMBean> partnerDetails;
+
+ public CommunicationObserver(String myID) {
+ this.identifier = myID;
+ this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>();
+ }
+
+ private void unregister(CommunicationPartnerMBean m) {
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ jmxServer.unregisterMBean(m.getMBeanName());
+ }
+ catch (Exception e) {
+ log.error("error unregistering mbean for client '" + m.getName() + "'", e);
+ }
+ }
+
+ public void unregister() {
+ for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
+ unregister(m);
+ }
+ }
+
+ public void gotMessageFrom(String client, String request, InetSocketAddress remote) throws MalformedObjectNameException {
+ log.debug("got message '" + request + "' from client " + client);
+ CommunicationPartnerMBean m = this.partnerDetails.get(client);
+ boolean register = false;
+ if (m == null) {
+ cleanUp();
+ m = new CommunicationPartnerMBean(client);
+ m.remoteAddress = remote.getAddress().getHostAddress();
+ m.remotePort = remote.getPort();
+ register = true;
+ }
+ m.lastSeen = new Date();
+ m.lastRequest = request;
+ this.partnerDetails.put(client, m);
+ if (register) {
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ jmxServer.registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), m.getMBeanName());
+ }
+ catch (Exception e) {
+ log.error("can register mbean for client '" + m.getName() + "'", e);
+ }
+ }
+ }
+
+ public void didSendSegmentBytes(String client, int size) {
+ log.debug("did send segment with " + size + " bytes to client " + client);
+ CommunicationPartnerMBean m = this.partnerDetails.get(client);
+ m.segmentsSent++;
+ m.segmentBytesSent += size;
+ this.partnerDetails.put(client, m);
+ }
+
+ public void didSendBinariesBytes(String client, int size) {
+ log.debug("did send binary with " + size + " bytes to client " + client);
+ CommunicationPartnerMBean m = this.partnerDetails.get(client);
+ m.binariesSent++;
+ m.binariesBytesSent += size;
+ this.partnerDetails.put(client, m);
+ }
+
+ public String getID() {
+ return this.identifier;
+ }
+
+ // helper
+
+ private void cleanUp() {
+ while (this.partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
+ CommunicationPartnerMBean oldestEntry = oldestEntry();
+ if (oldestEntry == null) return;
+ log.info("housekeeping: removing statistics for " + oldestEntry.getName());
+ unregister(oldestEntry);
+ this.partnerDetails.remove(oldestEntry.getName());
+ }
+ }
+
+ private CommunicationPartnerMBean oldestEntry() {
+ CommunicationPartnerMBean ret = null;
+ for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
+ if (ret == null || ret.lastSeen.after(m.lastSeen)) ret = m;
+ }
+ return ret;
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java Thu Dec 11 16:06:19 2014
@@ -16,14 +16,19 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.standby.store;
+import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
public interface RemoteSegmentLoader {
Segment readSegment(String id);
+ Blob readBlob(String blobId);
+
void close();
boolean isClosed();
+ boolean isRunning();
+
}
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java Thu Dec 11 16:06:19 2014
@@ -144,7 +144,7 @@ public class StandbyStoreService {
Dictionary<Object, Object> dictionary = new Hashtable<Object, Object>();
dictionary.put("scheduler.period", interval);
dictionary.put("scheduler.concurrent", false);
- dictionary.put("scheduler.runOn", "SINGLE");
+ // dictionary.put("scheduler.runOn", "SINGLE");
syncReg = context.getBundleContext().registerService(
Runnable.class.getName(), sync, dictionary);
Added: jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java?rev=1644689&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java Thu Dec 11 16:06:19 2014
@@ -0,0 +1,134 @@
+package org.apache.jackrabbit.oak.plugins.segment.standby;
+
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.FileDataStore;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient;
+import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+
+public class ExternalPrivateStoreIT extends TestBase {
+
+ private File primaryStore;
+ private File secondaryStore;
+
+ @Before
+ public void setUp() throws Exception {
+ setUpServerAndClient();
+ }
+
+ @After
+ public void after() {
+ closeServerAndClient();
+ try {
+ FileUtils.deleteDirectory(primaryStore);
+ FileUtils.deleteDirectory(secondaryStore);
+ } catch (IOException e) {
+ }
+ }
+
+ @Override
+ protected FileStore setupPrimary(File d) throws IOException {
+ primaryStore = getPrimaryStoreDir();
+ FileDataStore fds = new FileDataStore();
+ fds.setMinRecordLength(4092);
+ fds.init(primaryStore.getAbsolutePath());
+ DataStoreBlobStore blobStore = new DataStoreBlobStore(fds);
+ return new FileStore(blobStore, d, 1, false);
+ }
+
+ private static File getPrimaryStoreDir() throws IOException {
+ return createTmpTargetDir("ExternalStoreITPrimary");
+ }
+
+ @Override
+ protected FileStore setupSecondary(File d) throws IOException {
+ secondaryStore = getSecondaryStoreDir();
+ FileDataStore fds = new FileDataStore();
+ fds.setMinRecordLength(4092);
+ fds.init(secondaryStore.getAbsolutePath());
+ DataStoreBlobStore blobStore = new DataStoreBlobStore(fds);
+ return new FileStore(blobStore, d, 1, false);
+ }
+
+ private static File getSecondaryStoreDir() throws IOException {
+ return createTmpTargetDir("ExternalStoreITSecondary");
+ }
+
+ @Test
+ public void testSync() throws Exception {
+ final int mb = 1 * 1024 * 1024;
+ final int blobSize = 5 * mb;
+ FileStore primary = getPrimary();
+ FileStore secondary = getSecondary();
+
+ NodeStore store = new SegmentNodeStore(primary);
+ final StandbyServer server = new StandbyServer(getPort(), primary);
+ server.start();
+ byte[] data = addTestContent(store, "server", blobSize);
+
+ StandbyClient cl = new StandbyClient("127.0.0.1", getPort(), secondary);
+ cl.run();
+
+ try {
+ assertEquals(primary.getHead(), secondary.getHead());
+ } finally {
+ server.close();
+ cl.close();
+ }
+
+ assertTrue(primary.size() < mb);
+ assertTrue(secondary.size() < mb);
+
+ PropertyState ps = secondary.getHead().getChildNode("root")
+ .getChildNode("server").getProperty("testBlob");
+ assertNotNull(ps);
+ assertEquals(Type.BINARY.tag(), ps.getType().tag());
+ Blob b = ps.getValue(Type.BINARY);
+ assertEquals(blobSize, b.length());
+ byte[] testData = new byte[blobSize];
+ ByteStreams.readFully(b.getNewStream(), testData);
+ assertArrayEquals(data, testData);
+ }
+
+ private static byte[] addTestContent(NodeStore store, String child, int size)
+ throws CommitFailedException, IOException {
+ NodeBuilder builder = store.getRoot().builder();
+ builder.child(child).setProperty("ts", System.currentTimeMillis());
+
+ byte[] data = new byte[size];
+ new Random().nextBytes(data);
+ Blob blob = store.createBlob(new ByteArrayInputStream(data));
+
+ builder.child(child).setProperty("testBlob", blob);
+
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ return data;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java?rev=1644689&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java Thu Dec 11 16:06:19 2014
@@ -0,0 +1,127 @@
+package org.apache.jackrabbit.oak.plugins.segment.standby;
+
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.FileDataStore;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient;
+import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+
+public class ExternalSharedStoreIT extends TestBase {
+
+ private File externalStore;
+
+ @Before
+ public void setUp() throws Exception {
+ setUpServerAndClient();
+ }
+
+ @After
+ public void after() {
+ closeServerAndClient();
+ try {
+ FileUtils.deleteDirectory(externalStore);
+ } catch (IOException e) {
+ }
+ }
+
+ @Override
+ protected FileStore setupPrimary(File d) throws IOException {
+ externalStore = getPrimaryStoreDir();
+ FileDataStore fds = new FileDataStore();
+ fds.setMinRecordLength(4092);
+ fds.init(externalStore.getAbsolutePath());
+ DataStoreBlobStore blobStore = new DataStoreBlobStore(fds);
+ return new FileStore(blobStore, d, 1, false);
+ }
+
+ private static File getPrimaryStoreDir() throws IOException {
+ return createTmpTargetDir("ExternalCommonStoreIT");
+ }
+
+ @Override
+ protected FileStore setupSecondary(File d) throws IOException {
+ FileDataStore fds = new FileDataStore();
+ fds.setMinRecordLength(4092);
+ fds.init(externalStore.getAbsolutePath());
+ DataStoreBlobStore blobStore = new DataStoreBlobStore(fds);
+ return new FileStore(blobStore, d, 1, false);
+ }
+
+ @Test
+ public void testSync() throws Exception {
+ final int mb = 1 * 1024 * 1024;
+ final int blobSize = 5 * mb;
+ FileStore primary = getPrimary();
+ FileStore secondary = getSecondary();
+
+ NodeStore store = new SegmentNodeStore(primary);
+ final StandbyServer server = new StandbyServer(getPort(), primary);
+ server.start();
+ byte[] data = addTestContent(store, "server", blobSize);
+
+ StandbyClient cl = new StandbyClient("127.0.0.1", getPort(), secondary);
+ cl.run();
+
+ try {
+ assertEquals(primary.getHead(), secondary.getHead());
+ } finally {
+ server.close();
+ cl.close();
+ }
+
+ assertTrue(primary.size() < mb);
+ assertTrue(secondary.size() < mb);
+
+ PropertyState ps = secondary.getHead().getChildNode("root")
+ .getChildNode("server").getProperty("testBlob");
+ assertNotNull(ps);
+ assertEquals(Type.BINARY.tag(), ps.getType().tag());
+ Blob b = ps.getValue(Type.BINARY);
+ assertEquals(blobSize, b.length());
+ byte[] testData = new byte[blobSize];
+ ByteStreams.readFully(b.getNewStream(), testData);
+ assertArrayEquals(data, testData);
+ }
+
+ private static byte[] addTestContent(NodeStore store, String child, int size)
+ throws CommitFailedException, IOException {
+ NodeBuilder builder = store.getRoot().builder();
+ builder.child(child).setProperty("ts", System.currentTimeMillis());
+
+ byte[] data = new byte[size];
+ new Random().nextBytes(data);
+ Blob blob = store.createBlob(new ByteArrayInputStream(data));
+
+ builder.child(child).setProperty("testBlob", blob);
+
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ return data;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java
------------------------------------------------------------------------------
svn:mime-type = text/plain