You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2014/12/16 16:56:04 UTC
svn commit: r1645987 - in /jackrabbit/oak/branches/1.0: ./ oak-tarmk-standby/
oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/
oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/co...
Author: alexparvulescu
Date: Tue Dec 16 15:56:04 2014
New Revision: 1645987
URL: http://svn.apache.org/r1645987
Log:
OAK-2347 TarMK Cold Standby FSDS mirroring
- merged r1644689 and r1645611
Added:
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java
- copied, changed from r1644689, jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java
- copied unchanged from r1644689, jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java
- copied unchanged from r1644689, jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java
- copied unchanged from r1644689, jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java
- copied, changed from r1644689, jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java
- copied, changed from r1644689, jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java
Removed:
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentPreLoaderHandler.java
Modified:
jackrabbit/oak/branches/1.0/ (props changed)
jackrabbit/oak/branches/1.0/oak-tarmk-standby/pom.xml
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java (contents, props changed)
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java (contents, props changed)
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java (contents, props changed)
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverMultipleClientsTest.java (contents, props changed)
jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/resources/logback-test.xml
Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 16 15:56:04 2014
@@ -1,2 +1,2 @@
-/jackrabbit/oak/trunk:1584578,1584602,1584614,1584616,1584709,1584781,1584937,1585297,1585304-1585305,1585420,1585424,1585427,1585448,1585465,1585468,1585486,1585497,1585509,1585647,1585655-1585656,1585661,1585665-1585666,1585669-1585670,1585673,1585680,1585719,1585763,1585770,1585896,1585904,1585907,1585940,1585949,1585951,1585956,1585962-1585963,1586287,1586320,1586364,1586372,1586655,1586836,1587130,1587224,1587399,1587408,1587472,1587485,1587488,1587538,1587580,1587807,1588033,1588042,1588046,1588066,1588201,1589025,1589101,1589137,1589141,1589263,1589440,1589442,1589484,1589488,1589661,1589664,1589682,1589708,1589741,1589748,1589789,1589794,1589850,1589864,1590628,1590660,1590684,1590697,1590701,1590980,1590988,1591101,1591226,1591229,1591293,1591314,1591317,1591362,1591374,1591381,1591438,1591467,1591552,1591704,1591713,1591715,1591723,1591874,1592487,1592512,1592658,1592665,1592677,1592742,1592744,1592787,1592809,1592955,1593036,1593048,1593061,1593133,1593210-1593211,1593231
,1593245,1593250,1593294,1593304,1593317,1593342,1593554,1594158-1594164,1594166-1594167,1594169,1594237,1594800,1594808,1594835,1594888,1595147,1595457,1595856,1596241,1596474,1596534,1596844,1597569,1597795,1597854,1597860,1598292,1598302,1598352,1598369,1598595,1598631,1598696,1598732,1598797-1598798,1599299,1599332,1599416,1599434,1599671,1600088,1600935,1601309,1601388,1601578,1601676,1601757,1601768,1601814,1601833,1601838,1601853,1601878,1601888,1601922,1602156,1602174,1602179,1602183,1602207,1602227,1602256,1602261,1602796-1602797,1602800,1602809,1602853,1602872,1602914,1603155,1603307,1603401,1603441,1603748,1604166,1605030,1605036,1605038,1605292,1605447,1605526,1605670,1605725,1605831,1605852,1606077,1606079,1606087,1606638,1606641,1606644,1606708,1606711,1607031-1607032,1607077,1607127,1607141,1607152,1607185,1607196,1607331,1607362,1607366,1607392,1607526,1607664,1607737,1608560,1608783,1609064,1609081,1609165,1609488,1610489,1610592,1610603,1610634,1610658,1610664,1611
021,1611041,1611275,1611277,1611313,1611332,1611584,1612560,1612825,1612993,1613018,1613041,1614265,1614272,1614344-1614345,1614384-1614385,1614397,1614405-1614406,1614574,1614591,1614593,1614596,1614604,1614689,1614807,1614835,1614891,1615417-1615418,1616182,1616236,1616463,1616719,1617417,1617451,1617463,1617711,1618158,1618613,1618624,1618709,1619222,1619411,1619695,1619800,1619808,1619815,1619823-1619824,1620512,1620581,1620585,1620634,1620898,1621115,1621123-1621124,1621168,1621192,1621201,1621706,1621962,1622197,1622201,1622207,1622250,1622479,1623364,1623766,1623827,1623949,1623969,1623973,1624216,1624317,1624551,1624973,1624993-1624994,1625025,1625036,1625158,1625224,1625237,1625299,1625348,1625620,1625916,1625962-1625963,1626021,1626053,1626163,1626168,1626175,1626191,1626265,1626770,1627047,1627052,1627228,1627346,1627470,1627473,1627479,1627503,1627586,1627590,1627715,1627731,1628180,1628198,1628262,1628447,1628608,1629688,1629840,1629917,1630055-1630057,1630156,1630299,1
630338,1630773,1631283-1631284,1631333-1631334,1631617-1631619,1631630,1631699,1631704,1631711,1631967-1631969,1631986,1631990,1631999,1632002-1632003,1632017,1632258,1632264,1632270,1632293,1632303,1632592,1632605,1633315,1633559-1633560,1633562,1633567,1633571,1633598,1633608,1633641,1633687,1633697,1633768,1633783,1634505,1634513,1634774,1634779,1634781,1634792,1634803,1634814,1634816,1634838,1634841,1634852,1634864,1634896,1634898,1635044-1635045,1635060,1635077,1635089,1635102,1635108,1635218,1635387,1635435,1635518,1635563,1635586,1636336,1636348,1636505,1636585,1636799,1637368,1637382,1637413,1637651,1637815,1638779-1638783,1639260,1639577,1639622,1639963,1639966,1639973,1640134,1640143,1640555-1640556,1640694-1640695,1640715,1640722-1640723,1640728,1640863-1640872,1641340,1641352,1641541,1641596-1641599,1641601,1641662,1641671,1641695,1641771,1641802,1641811,1641950,1642031,1642056,1642119,1642285,1642648,1642667,1642954,1642959,1643111,1643178,1643186,1643204,1643287,164376
7,1643774,1643982,1644016,1644106,1644366,1644383,1644397-1644398,1644407,1644479,1644547,1644552,1644554,1644650,1644654,1644750,1645421,1645424,1645459,1645585,1645637,1645661-1645662,1645888,1645901,1645948,1645966,1645970-1645971
+/jackrabbit/oak/trunk:1584578,1584602,1584614,1584616,1584709,1584781,1584937,1585297,1585304-1585305,1585420,1585424,1585427,1585448,1585465,1585468,1585486,1585497,1585509,1585647,1585655-1585656,1585661,1585665-1585666,1585669-1585670,1585673,1585680,1585719,1585763,1585770,1585896,1585904,1585907,1585940,1585949,1585951,1585956,1585962-1585963,1586287,1586320,1586364,1586372,1586655,1586836,1587130,1587224,1587399,1587408,1587472,1587485,1587488,1587538,1587580,1587807,1588033,1588042,1588046,1588066,1588201,1589025,1589101,1589137,1589141,1589263,1589440,1589442,1589484,1589488,1589661,1589664,1589682,1589708,1589741,1589748,1589789,1589794,1589850,1589864,1590628,1590660,1590684,1590697,1590701,1590980,1590988,1591101,1591226,1591229,1591293,1591314,1591317,1591362,1591374,1591381,1591438,1591467,1591552,1591704,1591713,1591715,1591723,1591874,1592487,1592512,1592658,1592665,1592677,1592742,1592744,1592787,1592809,1592955,1593036,1593048,1593061,1593133,1593210-1593211,1593231
,1593245,1593250,1593294,1593304,1593317,1593342,1593554,1594158-1594164,1594166-1594167,1594169,1594237,1594800,1594808,1594835,1594888,1595147,1595457,1595856,1596241,1596474,1596534,1596844,1597569,1597795,1597854,1597860,1598292,1598302,1598352,1598369,1598595,1598631,1598696,1598732,1598797-1598798,1599299,1599332,1599416,1599434,1599671,1600088,1600935,1601309,1601388,1601578,1601676,1601757,1601768,1601814,1601833,1601838,1601853,1601878,1601888,1601922,1602156,1602174,1602179,1602183,1602207,1602227,1602256,1602261,1602796-1602797,1602800,1602809,1602853,1602872,1602914,1603155,1603307,1603401,1603441,1603748,1604166,1605030,1605036,1605038,1605292,1605447,1605526,1605670,1605725,1605831,1605852,1606077,1606079,1606087,1606638,1606641,1606644,1606708,1606711,1607031-1607032,1607077,1607127,1607141,1607152,1607185,1607196,1607331,1607362,1607366,1607392,1607526,1607664,1607737,1608560,1608783,1609064,1609081,1609165,1609488,1610489,1610592,1610603,1610634,1610658,1610664,1611
021,1611041,1611275,1611277,1611313,1611332,1611584,1612560,1612825,1612993,1613018,1613041,1614265,1614272,1614344-1614345,1614384-1614385,1614397,1614405-1614406,1614574,1614591,1614593,1614596,1614604,1614689,1614807,1614835,1614891,1615417-1615418,1616182,1616236,1616463,1616719,1617417,1617451,1617463,1617711,1618158,1618613,1618624,1618709,1619222,1619411,1619695,1619800,1619808,1619815,1619823-1619824,1620512,1620581,1620585,1620634,1620898,1621115,1621123-1621124,1621168,1621192,1621201,1621706,1621962,1622197,1622201,1622207,1622250,1622479,1623364,1623766,1623827,1623949,1623969,1623973,1624216,1624317,1624551,1624973,1624993-1624994,1625025,1625036,1625158,1625224,1625237,1625299,1625348,1625620,1625916,1625962-1625963,1626021,1626053,1626163,1626168,1626175,1626191,1626265,1626770,1627047,1627052,1627228,1627346,1627470,1627473,1627479,1627503,1627586,1627590,1627715,1627731,1628180,1628198,1628262,1628447,1628608,1629688,1629840,1629917,1630055-1630057,1630156,1630299,1
630338,1630773,1631283-1631284,1631333-1631334,1631617-1631619,1631630,1631699,1631704,1631711,1631967-1631969,1631986,1631990,1631999,1632002-1632003,1632017,1632258,1632264,1632270,1632293,1632303,1632592,1632605,1633315,1633559-1633560,1633562,1633567,1633571,1633598,1633608,1633641,1633687,1633697,1633768,1633783,1634505,1634513,1634774,1634779,1634781,1634792,1634803,1634814,1634816,1634838,1634841,1634852,1634864,1634896,1634898,1635044-1635045,1635060,1635077,1635089,1635102,1635108,1635218,1635387,1635435,1635518,1635563,1635586,1636336,1636348,1636505,1636585,1636799,1637368,1637382,1637413,1637651,1637815,1638779-1638783,1639260,1639577,1639622,1639963,1639966,1639973,1640134,1640143,1640555-1640556,1640694-1640695,1640715,1640722-1640723,1640728,1640863-1640872,1641340,1641352,1641541,1641596-1641599,1641601,1641662,1641671,1641695,1641771,1641802,1641811,1641950,1642031,1642056,1642119,1642285,1642648,1642667,1642954,1642959,1643111,1643178,1643186,1643204,1643287,164376
7,1643774,1643982,1644016,1644106,1644366,1644383,1644397-1644398,1644407,1644479,1644547,1644552,1644554,1644650,1644654,1644689,1644750,1645421,1645424,1645459,1645585,1645611,1645637,1645661-1645662,1645888,1645901,1645948,1645966,1645970-1645971
/jackrabbit/trunk:1345480
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/pom.xml?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/pom.xml (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/pom.xml Tue Dec 16 15:56:04 2014
@@ -141,6 +141,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-blob</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>io.netty</groupId>
@@ -202,5 +208,11 @@
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-data</artifactId>
+ <version>2.9.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java Tue Dec 16 15:56:04 2014
@@ -18,6 +18,7 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.standby.client;
+import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetBlobReq;
import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetSegmentReq;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -28,16 +29,17 @@ import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentReply;
-import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
import org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader;
-import org.apache.jackrabbit.oak.spi.state.ApplyDiff;
+import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,24 +52,23 @@ public class SegmentLoaderHandler extend
private final StandbyStore store;
private final String clientID;
private final RecordId head;
- private final EventExecutorGroup preloaderExecutor;
private final EventExecutorGroup loaderExecutor;
+ private final AtomicBoolean running;
- private int timeoutMs = 5000;
+ private int timeoutMs = 120000;
private ChannelHandlerContext ctx;
- final BlockingQueue<Segment> segment = new LinkedBlockingQueue<Segment>();
+ final BlockingQueue<SegmentReply> segment = new LinkedBlockingQueue<SegmentReply>();
public SegmentLoaderHandler(final StandbyStore store, RecordId head,
- EventExecutorGroup preloaderExecutor,
EventExecutorGroup loaderExecutor,
- String clientID) {
+ String clientID, AtomicBoolean running) {
this.store = store;
this.head = head;
- this.preloaderExecutor = preloaderExecutor;
this.loaderExecutor = loaderExecutor;
this.clientID = clientID;
+ this.running = running;
}
@Override
@@ -80,8 +81,7 @@ public class SegmentLoaderHandler extend
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof SegmentReply) {
- //log.debug("offering segment " + ((SegmentReply) evt).getSegment());
- segment.offer(((SegmentReply) evt).getSegment());
+ segment.offer((SegmentReply) evt);
}
}
@@ -97,10 +97,10 @@ public class SegmentLoaderHandler extend
SegmentNodeState current = new SegmentNodeState(head);
do {
try {
- current.compareAgainstBaseState(before, new ApplyDiff(builder));
+ current.compareAgainstBaseState(before,
+ new StandbyApplyDiff(builder, store, this));
break;
- }
- catch (SegmentNotFoundException e) {
+ } catch (SegmentNotFoundException e) {
// the segment is locally damaged or not present anymore
// lets try to read this from the primary again
String id = e.getSegmentId();
@@ -114,8 +114,7 @@ public class SegmentLoaderHandler extend
ByteArrayOutputStream bout = new ByteArrayOutputStream(s.size());
try {
s.writeTo(bout);
- }
- catch (IOException f) {
+ } catch (IOException f) {
log.error("can't wrap segment to output stream", f);
throw e;
}
@@ -137,25 +136,50 @@ public class SegmentLoaderHandler extend
}
@Override
+ public Blob readBlob(String blobId) {
+ ctx.writeAndFlush(newGetBlobReq(this.clientID, blobId));
+ return getBlob(blobId);
+ }
+
+ @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- log.warn("Closing channel. Got exception: " + cause);
- ctx.close();
+ log.error("Exception caught, closing channel.", cause);
+ close();
+ }
+
+ private Segment getSegment(final String id) {
+ return getReply(id, SegmentReply.SEGMENT).getSegment();
}
- // implementation of RemoteSegmentLoader
+ private Blob getBlob(final String id) {
+ return getReply(id, SegmentReply.BLOB).getBlob();
+ }
- public Segment getSegment(final String id) {
+ private SegmentReply getReply(final String id, int type) {
boolean interrupted = false;
try {
for (;;) {
try {
- Segment s = segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
- if (s == null) {
- return null;
+ SegmentReply r = segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
+ if (r == null) {
+ log.warn("timeout waiting for {}", id);
+ return SegmentReply.empty();
}
- if (s.getSegmentId().toString().equals(id)) {
- return s;
+ if (r.getType() == type) {
+ switch (r.getType()) {
+ case SegmentReply.SEGMENT:
+ if (r.getSegment().getSegmentId().toString()
+ .equals(id)) {
+ return r;
+ }
+ break;
+ case SegmentReply.BLOB:
+ if (r.getBlob().getBlobId().equals(id)) {
+ return r;
+ }
+ break;
+ }
}
} catch (InterruptedException ignore) {
interrupted = true;
@@ -166,24 +190,26 @@ public class SegmentLoaderHandler extend
Thread.currentThread().interrupt();
}
}
-
}
+ @Override
public void close() {
ctx.close();
- if (preloaderExecutor != null && !preloaderExecutor.isShuttingDown()) {
- preloaderExecutor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
- .syncUninterruptibly();
- }
if (loaderExecutor != null && !loaderExecutor.isShuttingDown()) {
loaderExecutor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
.syncUninterruptibly();
}
}
+ @Override
public boolean isClosed() {
return (loaderExecutor != null && (loaderExecutor.isShuttingDown() || loaderExecutor
.isShutdown()));
}
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
}
Copied: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java (from r1644689, jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java?p2=jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java&p1=jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java&r1=1644689&r2=1645987&rev=1645987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java Tue Dec 16 15:56:04 2014
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.jackrabbit.oak.plugins.segment.standby.client;
import static org.apache.jackrabbit.oak.api.Type.BINARIES;
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java Tue Dec 16 15:56:04 2014
@@ -40,6 +40,7 @@ import java.io.Closeable;
import java.lang.management.ManagementFactory;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
@@ -72,12 +73,13 @@ public final class StandbyClient impleme
private EventExecutorGroup executor;
private SslContext sslContext;
private boolean active = false;
- private boolean running;
private int failedRequests;
private long lastSuccessfulRequest;
private volatile String state;
private final Object sync = new Object();
+ private final AtomicBoolean running = new AtomicBoolean(true);
+
public StandbyClient(String host, int port, SegmentStore store) throws SSLException {
this(host, port, store, false);
}
@@ -124,6 +126,10 @@ public final class StandbyClient impleme
}
public void run() {
+ if (!isRunning()) {
+ // manually stopped
+ return;
+ }
Bootstrap b;
synchronized (this.sync) {
@@ -132,7 +138,7 @@ public final class StandbyClient impleme
}
state = STATUS_STARTING;
executor = new DefaultEventExecutorGroup(4);
- handler = new StandbyClientHandler(this.store, executor, this.observer);
+ handler = new StandbyClientHandler(this.store, executor, this.observer, this.running);
group = new NioEventLoopGroup();
b = new Bootstrap();
@@ -160,7 +166,6 @@ public final class StandbyClient impleme
}
});
state = STATUS_RUNNING;
- this.running = true;
this.active = true;
}
@@ -200,20 +205,19 @@ public final class StandbyClient impleme
}
@Override
- public boolean isRunning() { return running;}
+ public boolean isRunning() {
+ return running.get();
+ }
@Override
public void start() {
- if (!running) run();
+ running.set(true);
}
@Override
public void stop() {
- //TODO running flag doesn't make sense this way, since run() is usually scheduled to be called repeatedly.
- if (running) {
- running = false;
- state = STATUS_STOPPED;
- }
+ running.set(false);
+ state = STATUS_STOPPED;
}
@Override
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java Tue Dec 16 15:56:04 2014
@@ -26,17 +26,18 @@ import io.netty.util.concurrent.EventExe
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
-import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentDecoder;
+import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder;
import org.apache.jackrabbit.oak.plugins.segment.standby.store.CommunicationObserver;
import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StandbyClientHandler extends
- SimpleChannelInboundHandler<RecordId> implements Closeable {
+public class StandbyClientHandler extends SimpleChannelInboundHandler<RecordId>
+ implements Closeable {
private static final Logger log = LoggerFactory
.getLogger(StandbyClientHandler.class);
@@ -44,16 +45,18 @@ public class StandbyClientHandler extend
private final StandbyStore store;
private final EventExecutorGroup executor;
private final CommunicationObserver observer;
- private EventExecutorGroup preloaderExecutor;
- private EventExecutorGroup loaderExecutor;
+ private final AtomicBoolean running;
+ private EventExecutorGroup loaderExecutor;
private ChannelHandlerContext ctx;
public StandbyClientHandler(final StandbyStore store,
- EventExecutorGroup executor, CommunicationObserver observer) {
+ EventExecutorGroup executor, CommunicationObserver observer,
+ AtomicBoolean running) {
this.store = store;
this.executor = executor;
this.observer = observer;
+ this.running = running;
}
@Override
@@ -87,33 +90,31 @@ public class StandbyClientHandler extend
log.debug("updating current head to " + head);
ctx.pipeline().remove(RecordIdDecoder.class);
ctx.pipeline().remove(this);
- ctx.pipeline().addLast(new SegmentDecoder(store));
-
- preloaderExecutor = new DefaultEventExecutorGroup(4);
- SegmentPreLoaderHandler h1 = new SegmentPreLoaderHandler();
- ctx.pipeline().addLast(preloaderExecutor, h1);
+ ctx.pipeline().addLast(new ReplyDecoder(store));
loaderExecutor = new DefaultEventExecutorGroup(4);
SegmentLoaderHandler h2 = new SegmentLoaderHandler(store, head,
- preloaderExecutor, loaderExecutor, this.observer.getID());
+ loaderExecutor, this.observer.getID(), running);
ctx.pipeline().addLast(loaderExecutor, h2);
- h1.channelActive(ctx);
h2.channelActive(ctx);
log.debug("updating current head finished");
}
@Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ log.error("Exception caught, closing channel.", cause);
+ close();
+ }
+
+ @Override
public void close() {
ctx.close();
if (!executor.isShuttingDown()) {
executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
.syncUninterruptibly();
}
- if (preloaderExecutor != null && !preloaderExecutor.isShuttingDown()) {
- preloaderExecutor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
- .syncUninterruptibly();
- }
if (loaderExecutor != null && !loaderExecutor.isShuttingDown()) {
loaderExecutor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
.syncUninterruptibly();
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java Tue Dec 16 15:56:04 2014
@@ -22,9 +22,11 @@ public class Messages {
public static final byte HEADER_RECORD = 0x00;
public static final byte HEADER_SEGMENT = 0x01;
+ public static final byte HEADER_BLOB = 0x02;
public static final String GET_HEAD = "h";
public static final String GET_SEGMENT = "s.";
+ public static final String GET_BLOB = "b.";
private static final String MAGIC = "Standby-CMD@";
private static final String SEPARATOR = ":";
@@ -41,6 +43,10 @@ public class Messages {
return newRequest(clientID, GET_SEGMENT + sid);
}
+ public static String newGetBlobReq(String clientID, String blobId) {
+ return newRequest(clientID, GET_BLOB + blobId);
+ }
+
public static String extractMessageFrom(String payload) {
if (payload.startsWith(MAGIC) && payload.length() > MAGIC.length()) {
int i = payload.indexOf(SEPARATOR);
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java Tue Dec 16 15:56:04 2014
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
+@Deprecated
public class SegmentDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java Tue Dec 16 15:56:04 2014
@@ -20,14 +20,47 @@ import org.apache.jackrabbit.oak.plugins
public class SegmentReply {
+ public static final int SEGMENT = 0;
+ public static final int BLOB = 1;
+
+ public static SegmentReply empty() {
+ return new SegmentReply();
+ }
+
+ private final int type;
+
private final Segment segment;
+ private final IdArrayBasedBlob blob;
+
public SegmentReply(Segment segment) {
+ this.type = SEGMENT;
this.segment = segment;
+ this.blob = null;
+ }
+
+ public SegmentReply(IdArrayBasedBlob blob) {
+ this.type = BLOB;
+ this.segment = null;
+ this.blob = blob;
+ }
+
+ private SegmentReply() {
+ this.type = -1;
+ this.segment = null;
+ this.blob = null;
}
public Segment getSegment() {
return this.segment;
}
+ public IdArrayBasedBlob getBlob() {
+ return blob;
+ }
+
+ public int getType() {
+ return type;
+ }
+
}
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java Tue Dec 16 15:56:04 2014
@@ -1,48 +1,55 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
-
-import org.apache.jackrabbit.oak.commons.jmx.Description;
-
-import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
-
-public interface ObservablePartnerMBean {
-
- @Nonnull
- @Description("name of the partner")
- String getName();
-
- @Description("IP of the remote")
- String getRemoteAddress();
-
- @Description("Last request")
- String getLastRequest();
-
- @Description("Port of the remote")
- int getRemotePort();
-
- @CheckForNull
- @Description("Time the remote instance was last contacted")
- String getLastSeenTimestamp();
-
- @Description("Number of transferred segments")
- long getTransferredSegments();
-
- @Description("Number of bytes stored in transferred segments")
- long getTransferredSegmentBytes();
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
+
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+public interface ObservablePartnerMBean {
+
+ @Nonnull
+ @Description("name of the partner")
+ String getName();
+
+ @Description("IP of the remote")
+ String getRemoteAddress();
+
+ @Description("Last request")
+ String getLastRequest();
+
+ @Description("Port of the remote")
+ int getRemotePort();
+
+ @CheckForNull
+ @Description("Time the remote instance was last contacted")
+ String getLastSeenTimestamp();
+
+ @Description("Number of transferred segments")
+ long getTransferredSegments();
+
+ @Description("Number of bytes stored in transferred segments")
+ long getTransferredSegmentBytes();
+
+ @Description("Number of transferred binaries")
+ long getTransferredBinaries();
+
+ @Description("Number of bytes stored in transferred binaries")
+ long getTransferredBinariesBytes();
+
+}
Propchange: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java Tue Dec 16 15:56:04 2014
@@ -1,47 +1,47 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
-
-import org.apache.jackrabbit.oak.commons.jmx.Description;
-import javax.annotation.Nonnull;
-
-public interface StandbyStatusMBean {
- public static final String JMX_NAME = "org.apache.jackrabbit.oak:name=Status,type=\"Standby\"";
- public static final String STATUS_INITIALIZING = "initializing";
- public static final String STATUS_STOPPED = "stopped";
- public static final String STATUS_STARTING = "starting";
- public static final String STATUS_RUNNING = "running";
- public static final String STATUS_CLOSING = "closing";
- public static final String STATUS_CLOSED = "closed";
-
- @Nonnull
- @Description("primary or standby")
- String getMode();
-
- @Description("current status of the service")
- String getStatus();
-
- @Description("instance is running")
- boolean isRunning();
-
- @Description("stop the communication")
- void stop();
-
- @Description("start the communication")
- void start();
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
+
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+import javax.annotation.Nonnull;
+
+public interface StandbyStatusMBean {
+ public static final String JMX_NAME = "org.apache.jackrabbit.oak:name=Status,type=\"Standby\"";
+ public static final String STATUS_INITIALIZING = "initializing";
+ public static final String STATUS_STOPPED = "stopped";
+ public static final String STATUS_STARTING = "starting";
+ public static final String STATUS_RUNNING = "running";
+ public static final String STATUS_CLOSING = "closing";
+ public static final String STATUS_CLOSED = "closed";
+
+ @Nonnull
+ @Description("primary or standby")
+ String getMode();
+
+ @Description("current status of the service")
+ String getStatus();
+
+ @Description("instance is running")
+ boolean isRunning();
+
+ @Description("stop the communication")
+ void stop();
+
+ @Description("start the communication")
+ void start();
+}
Propchange: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java Tue Dec 16 15:56:04 2014
@@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.plugin
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -33,15 +34,15 @@ import io.netty.handler.codec.string.Str
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.Future;
import java.io.Closeable;
import java.lang.management.ManagementFactory;
import java.security.cert.CertificateException;
import java.util.concurrent.TimeUnit;
-import io.netty.util.concurrent.Future;
-
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.standby.codec.BlobEncoder;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdEncoder;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentEncoder;
import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean;
@@ -129,6 +130,7 @@ public class StandbyServer implements St
p.addLast(new SnappyFramedEncoder());
p.addLast(new RecordIdEncoder());
p.addLast(new SegmentEncoder());
+ p.addLast(new BlobEncoder());
p.addLast(handler);
}
});
@@ -169,12 +171,31 @@ public class StandbyServer implements St
public void run() {
try {
running = true;
+ handler.state = STATUS_RUNNING;
channelFuture.sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
StandbyServer.this.stop();
}
}
};
+ final ChannelFutureListener bindListener = new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isSuccess()) {
+ close.start();
+ } else {
+ log.error("Server failed to start, will be canceled",
+ future.cause());
+ future.channel().close();
+ new Thread() {
+ @Override
+ public void run() {
+ close();
+ }
+ }.start();
+ }
+ }
+ };
Future<?> startup = bossGroup.submit(new Runnable() {
@Override
public void run() {
@@ -184,7 +205,7 @@ public class StandbyServer implements St
//the channel registration synchronous.
//Note that now this method will return immediately.
channelFuture = b.bind(port);
- close.start();
+ channelFuture.addListener(bindListener);
}
});
if (!startup.awaitUninterruptibly(10000)) {
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java Tue Dec 16 15:56:04 2014
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelHandler.S
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
@@ -158,7 +159,7 @@ public class StandbyServerHandler extend
} catch (IllegalStateException e) {
// segment not found
log.debug("waiting for segment. Got exception: " + e.getMessage());
- TimeUnit.MILLISECONDS.sleep(1000);
+ TimeUnit.MILLISECONDS.sleep(2000);
}
if (s != null) break;
}
@@ -169,6 +170,15 @@ public class StandbyServerHandler extend
observer.didSendSegmentBytes(clientID, s.size());
return;
}
+ } else if (request.startsWith(Messages.GET_BLOB)) {
+ String bid = request.substring(Messages.GET_BLOB.length());
+ log.debug("request blob id {}", bid);
+ Blob b = store.readBlob(bid);
+ log.debug("sending blob " + bid + " to " + client);
+ ctx.writeAndFlush(b);
+ observer.didSendBinariesBytes(clientID,
+ Math.max(0, (int) b.length()));
+ return;
} else {
log.warn("Unknown request {}, ignoring.", request);
}
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java Tue Dec 16 15:56:04 2014
@@ -1,179 +1,199 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.jackrabbit.oak.plugins.segment.standby.store;
-
-
-import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean;
-import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ObservablePartnerMBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-public class CommunicationObserver {
- private static final int MAX_CLIENT_STATISTICS = 10;
-
- private class CommunicationPartnerMBean implements ObservablePartnerMBean {
- private final ObjectName mbeanName;
- private final String clientName;
- public String lastRequest;
- public Date lastSeen;
- public String remoteAddress;
- public int remotePort;
- public long segmentsSent;
- public long segmentBytesSent;
-
- public CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException {
- this.clientName = clientName;
- this.mbeanName = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=\"Client " + clientName + "\"");
- }
-
- public ObjectName getMBeanName() {
- return this.mbeanName;
- }
-
- @Override
- public String getName() {
- return this.clientName;
- }
-
- @Override
- public String getRemoteAddress() {
- return this.remoteAddress;
- }
-
- @Override
- public String getLastRequest() {
- return this.lastRequest;
- }
-
- @Override
- public int getRemotePort() {
- return this.remotePort;
- }
-
- @Override
- public String getLastSeenTimestamp() {
- return this.lastSeen == null ? null : this.lastSeen.toString();
- }
-
- @Override
- public long getTransferredSegments() {
- return this.segmentsSent;
- }
-
- @Override
- public long getTransferredSegmentBytes() {
- return this.segmentBytesSent;
- }
- }
-
- private static final Logger log = LoggerFactory
- .getLogger(CommunicationObserver.class);
-
- private final String identifier;
- private final Map<String, CommunicationPartnerMBean> partnerDetails;
-
- public CommunicationObserver(String myID) {
- this.identifier = myID;
- this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>();
- }
-
- private void unregister(CommunicationPartnerMBean m) {
- final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
- try {
- jmxServer.unregisterMBean(m.getMBeanName());
- }
- catch (Exception e) {
- log.error("error unregistering mbean for client '" + m.getName() + "'", e);
- }
- }
-
- public void unregister() {
- for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
- unregister(m);
- }
- }
-
- public void gotMessageFrom(String client, String request, InetSocketAddress remote) throws MalformedObjectNameException {
- log.debug("got message '" + request + "' from client " + client);
- CommunicationPartnerMBean m = this.partnerDetails.get(client);
- boolean register = false;
- if (m == null) {
- cleanUp();
- m = new CommunicationPartnerMBean(client);
- m.remoteAddress = remote.getAddress().getHostAddress();
- m.remotePort = remote.getPort();
- register = true;
- }
- m.lastSeen = new Date();
- m.lastRequest = request;
- this.partnerDetails.put(client, m);
- if (register) {
- final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
- try {
- jmxServer.registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), m.getMBeanName());
- }
- catch (Exception e) {
- log.error("can register mbean for client '" + m.getName() + "'", e);
- }
- }
- }
-
- public void didSendSegmentBytes(String client, int size) {
- log.debug("did send segment with " + size + " bytes to client " + client);
- CommunicationPartnerMBean m = this.partnerDetails.get(client);
- m.segmentsSent++;
- m.segmentBytesSent += size;
- this.partnerDetails.put(client, m);
- }
-
- public String getID() {
- return this.identifier;
- }
-
- // helper
-
- private void cleanUp() {
- while (this.partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
- CommunicationPartnerMBean oldestEntry = oldestEntry();
- if (oldestEntry == null) return;
- log.info("housekeeping: removing statistics for " + oldestEntry.getName());
- unregister(oldestEntry);
- this.partnerDetails.remove(oldestEntry.getName());
- }
- }
-
- private CommunicationPartnerMBean oldestEntry() {
- CommunicationPartnerMBean ret = null;
- for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
- if (ret == null || ret.lastSeen.after(m.lastSeen)) ret = m;
- }
- return ret;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.standby.store;
+
+
+import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean;
+import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ObservablePartnerMBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CommunicationObserver {
+ private static final int MAX_CLIENT_STATISTICS = 10;
+
+ private class CommunicationPartnerMBean implements ObservablePartnerMBean {
+ private final ObjectName mbeanName;
+ private final String clientName;
+ public String lastRequest;
+ public Date lastSeen;
+ public String remoteAddress;
+ public int remotePort;
+ public long segmentsSent;
+ public long segmentBytesSent;
+ public long binariesSent;
+ public long binariesBytesSent;
+
+ public CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException {
+ this.clientName = clientName;
+ this.mbeanName = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=\"Client " + clientName + "\"");
+ }
+
+ public ObjectName getMBeanName() {
+ return this.mbeanName;
+ }
+
+ @Override
+ public String getName() {
+ return this.clientName;
+ }
+
+ @Override
+ public String getRemoteAddress() {
+ return this.remoteAddress;
+ }
+
+ @Override
+ public String getLastRequest() {
+ return this.lastRequest;
+ }
+
+ @Override
+ public int getRemotePort() {
+ return this.remotePort;
+ }
+
+ @Override
+ public String getLastSeenTimestamp() {
+ return this.lastSeen == null ? null : this.lastSeen.toString();
+ }
+
+ @Override
+ public long getTransferredSegments() {
+ return this.segmentsSent;
+ }
+
+ @Override
+ public long getTransferredSegmentBytes() {
+ return this.segmentBytesSent;
+ }
+
+ @Override
+ public long getTransferredBinaries() {
+ return this.binariesSent;
+ }
+
+ @Override
+ public long getTransferredBinariesBytes() {
+ return this.binariesBytesSent;
+ }
+ }
+
+ private static final Logger log = LoggerFactory
+ .getLogger(CommunicationObserver.class);
+
+ private final String identifier;
+ private final Map<String, CommunicationPartnerMBean> partnerDetails;
+
+ public CommunicationObserver(String myID) {
+ this.identifier = myID;
+ this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>();
+ }
+
+ private void unregister(CommunicationPartnerMBean m) {
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ jmxServer.unregisterMBean(m.getMBeanName());
+ }
+ catch (Exception e) {
+ log.error("error unregistering mbean for client '" + m.getName() + "'", e);
+ }
+ }
+
+ public void unregister() {
+ for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
+ unregister(m);
+ }
+ }
+
+ public void gotMessageFrom(String client, String request, InetSocketAddress remote) throws MalformedObjectNameException {
+ log.debug("got message '" + request + "' from client " + client);
+ CommunicationPartnerMBean m = this.partnerDetails.get(client);
+ boolean register = false;
+ if (m == null) {
+ cleanUp();
+ m = new CommunicationPartnerMBean(client);
+ m.remoteAddress = remote.getAddress().getHostAddress();
+ m.remotePort = remote.getPort();
+ register = true;
+ }
+ m.lastSeen = new Date();
+ m.lastRequest = request;
+ this.partnerDetails.put(client, m);
+ if (register) {
+ final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ jmxServer.registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), m.getMBeanName());
+ }
+ catch (Exception e) {
+ log.error("can register mbean for client '" + m.getName() + "'", e);
+ }
+ }
+ }
+
+ public void didSendSegmentBytes(String client, int size) {
+ log.debug("did send segment with " + size + " bytes to client " + client);
+ CommunicationPartnerMBean m = this.partnerDetails.get(client);
+ m.segmentsSent++;
+ m.segmentBytesSent += size;
+ this.partnerDetails.put(client, m);
+ }
+
+ public void didSendBinariesBytes(String client, int size) {
+ log.debug("did send binary with " + size + " bytes to client " + client);
+ CommunicationPartnerMBean m = this.partnerDetails.get(client);
+ m.binariesSent++;
+ m.binariesBytesSent += size;
+ this.partnerDetails.put(client, m);
+ }
+
+ public String getID() {
+ return this.identifier;
+ }
+
+ // helper
+
+ private void cleanUp() {
+ while (this.partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
+ CommunicationPartnerMBean oldestEntry = oldestEntry();
+ if (oldestEntry == null) return;
+ log.info("housekeeping: removing statistics for " + oldestEntry.getName());
+ unregister(oldestEntry);
+ this.partnerDetails.remove(oldestEntry.getName());
+ }
+ }
+
+ private CommunicationPartnerMBean oldestEntry() {
+ CommunicationPartnerMBean ret = null;
+ for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
+ if (ret == null || ret.lastSeen.after(m.lastSeen)) ret = m;
+ }
+ return ret;
+ }
+}
Propchange: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java Tue Dec 16 15:56:04 2014
@@ -16,14 +16,19 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.standby.store;
+import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
public interface RemoteSegmentLoader {
Segment readSegment(String id);
+ Blob readBlob(String blobId);
+
void close();
boolean isClosed();
+ boolean isRunning();
+
}
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java Tue Dec 16 15:56:04 2014
@@ -144,7 +144,7 @@ public class StandbyStoreService {
Dictionary<Object, Object> dictionary = new Hashtable<Object, Object>();
dictionary.put("scheduler.period", interval);
dictionary.put("scheduler.concurrent", false);
- dictionary.put("scheduler.runOn", "SINGLE");
+ // dictionary.put("scheduler.runOn", "SINGLE");
syncReg = context.getBundleContext().registerService(
Runnable.class.getName(), sync, dictionary);
Copied: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java (from r1644689, jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java?p2=jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java&p1=jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java&r1=1644689&r2=1645987&rev=1645987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java Tue Dec 16 15:56:04 2014
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.jackrabbit.oak.plugins.segment.standby;
import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
Copied: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java (from r1644689, jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java?p2=jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java&p1=jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java&r1=1644689&r2=1645987&rev=1645987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java Tue Dec 16 15:56:04 2014
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.jackrabbit.oak.plugins.segment.standby;
import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverMultipleClientsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverMultipleClientsTest.java?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverMultipleClientsTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverMultipleClientsTest.java Tue Dec 16 15:56:04 2014
@@ -1,83 +1,85 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.jackrabbit.oak.plugins.segment.standby;
-
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils;
-import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient;
-import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer;
-import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static junit.framework.Assert.assertFalse;
-import static org.junit.Assert.assertEquals;
-
-public class FailoverMultipleClientsTest extends TestBase {
-
- @Before
- public void setUp() throws Exception {
- setUpServerAndTwoClients();
- }
-
- @After
- public void after() {
- closeServerAndTwoClients();
- }
-
- @Test
- public void testMultipleClients() throws Exception {
- NodeStore store = new SegmentNodeStore(storeS);
- final StandbyServer server = new StandbyServer(port, storeS);
- server.start();
- SegmentTestUtils.addTestContent(store, "server");
- storeS.flush(); // this speeds up the test a little bit...
-
- StandbyClient cl1 = new StandbyClient("127.0.0.1", port, storeC);
- StandbyClient cl2 = new StandbyClient("127.0.0.1", port, storeC2);
-
- try {
- assertFalse("first client has invalid initial store!", storeS.getHead().equals(storeC.getHead()));
- assertFalse("second client has invalid initial store!", storeS.getHead().equals(storeC2.getHead()));
- assertEquals(storeC.getHead(), storeC2.getHead());
-
- cl1.run();
- cl2.run();
-
- assertEquals(storeS.getHead(), storeC.getHead());
- assertEquals(storeS.getHead(), storeC2.getHead());
-
- cl1.stop();
- SegmentTestUtils.addTestContent(store, "test");
- cl2.run();
-
- assertEquals(storeS.getHead(), storeC2.getHead());
- assertFalse("first client updated in stopped state!", storeS.getHead().equals(storeC.getHead()));
-
- cl1.start();
- assertEquals(storeS.getHead(), storeC.getHead());
- } finally {
- server.close();
- cl1.close();
- cl2.close();
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.standby;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils;
+import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient;
+import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+public class FailoverMultipleClientsTest extends TestBase {
+
+ @Before
+ public void setUp() throws Exception {
+ setUpServerAndTwoClients();
+ }
+
+ @After
+ public void after() {
+ closeServerAndTwoClients();
+ }
+
+ @Test
+ public void testMultipleClients() throws Exception {
+ NodeStore store = new SegmentNodeStore(storeS);
+ final StandbyServer server = new StandbyServer(port, storeS);
+ server.start();
+ SegmentTestUtils.addTestContent(store, "server");
+ storeS.flush(); // this speeds up the test a little bit...
+
+ StandbyClient cl1 = new StandbyClient("127.0.0.1", port, storeC);
+ StandbyClient cl2 = new StandbyClient("127.0.0.1", port, storeC2);
+
+ try {
+ assertFalse("first client has invalid initial store!", storeS.getHead().equals(storeC.getHead()));
+ assertFalse("second client has invalid initial store!", storeS.getHead().equals(storeC2.getHead()));
+ assertEquals(storeC.getHead(), storeC2.getHead());
+
+ cl1.run();
+ cl2.run();
+
+ assertEquals(storeS.getHead(), storeC.getHead());
+ assertEquals(storeS.getHead(), storeC2.getHead());
+
+ cl1.stop();
+ SegmentTestUtils.addTestContent(store, "test");
+ cl1.run();
+ cl2.run();
+
+ assertEquals(storeS.getHead(), storeC2.getHead());
+ assertFalse("first client updated in stopped state!", storeS.getHead().equals(storeC.getHead()));
+
+ cl1.start();
+ cl1.run();
+ assertEquals(storeS.getHead(), storeC.getHead());
+ } finally {
+ server.close();
+ cl1.close();
+ cl2.close();
+ }
+ }
+
+}
Propchange: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverMultipleClientsTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/resources/logback-test.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/resources/logback-test.xml?rev=1645987&r1=1645986&r2=1645987&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/resources/logback-test.xml (original)
+++ jackrabbit/oak/branches/1.0/oak-tarmk-standby/src/test/resources/logback-test.xml Tue Dec 16 15:56:04 2014
@@ -22,6 +22,9 @@
</encoder>
</appender>
+ <logger name="org.apache.jackrabbit.oak.plugins.segment.standby" level="ERROR"/>
+ <logger name="org.apache.jackrabbit.oak.plugins.blob.datastore" level="ERROR"/>
+
<root level="INFO">
<appender-ref ref="console"/>
</root>