You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2014/11/10 11:55:03 UTC

svn commit: r1637815 - in /jackrabbit/oak/trunk/oak-tarmk-standby/src: main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/ test/java/org/apache/jackrabbit/oak/plugins/segment/standby/

Author: alexparvulescu
Date: Mon Nov 10 10:55:02 2014
New Revision: 1637815

URL: http://svn.apache.org/r1637815
Log:
OAK-2260 TarMK Cold Standby can corrupt bulk segments

Modified:
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverTest.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/TestBase.java   (contents, props changed)

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java?rev=1637815&r1=1637814&r2=1637815&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java Mon Nov 10 10:55:02 2014
@@ -133,7 +133,7 @@ public class SegmentLoaderHandler extend
     @Override
     public Segment readSegment(final String id) {
         ctx.writeAndFlush(newGetSegmentReq(this.clientID, id));
-        return getSegment();
+        return getSegment(id);
     }
 
     @Override
@@ -145,15 +145,18 @@ public class SegmentLoaderHandler extend
 
     // implementation of RemoteSegmentLoader
 
-    public Segment getSegment() {
+    public Segment getSegment(final String id) {
         boolean interrupted = false;
         try {
             for (;;) {
                 try {
-                    // log.debug("polling segment");
                     Segment s = segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
-                    // log.debug("returning segment " + s.getSegmentId());
-                    return s;
+                    if (s == null) {
+                        return null;
+                    }
+                    if (s.getSegmentId().toString().equals(id)) {
+                        return s;
+                    }
                 } catch (InterruptedException ignore) {
                     interrupted = true;
                 }

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverTest.java?rev=1637815&r1=1637814&r2=1637815&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverTest.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverTest.java Mon Nov 10 10:55:02 2014
@@ -18,21 +18,35 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.standby;
 
-import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent;
 import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.IOException;
+import java.util.Random;
 
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
 import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient;
 import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.io.ByteStreams;
+
 public class FailoverTest extends TestBase {
 
     @Before
@@ -47,22 +61,54 @@ public class FailoverTest extends TestBa
 
     @Test
     public void testFailover() throws Exception {
+        final int mb = 1 * 1024 * 1024;
+        final int blobSize = 5 * mb;
+        FileStore primary = getPrimary();
+        FileStore secondary = getSecondary();
 
-        NodeStore store = new SegmentNodeStore(storeS);
-        final StandbyServer server = new StandbyServer(port, storeS);
+        NodeStore store = new SegmentNodeStore(primary);
+        final StandbyServer server = new StandbyServer(getPort(), primary);
         server.start();
-        addTestContent(store, "server");
+        byte[] data = addTestContent(store, "server", blobSize);
 
-        StandbyClient cl = new StandbyClient("127.0.0.1", port, storeC);
+        StandbyClient cl = new StandbyClient("127.0.0.1", getPort(), secondary);
         cl.run();
 
         try {
-            assertEquals(storeS.getHead(), storeC.getHead());
+            assertEquals(primary.getHead(), secondary.getHead());
         } finally {
             server.close();
             cl.close();
         }
 
+        assertTrue(primary.size() > blobSize);
+        assertTrue(secondary.size() > blobSize);
+
+        PropertyState ps = secondary.getHead().getChildNode("root")
+                .getChildNode("server").getProperty("testBlob");
+        assertNotNull(ps);
+        assertEquals(Type.BINARY.tag(), ps.getType().tag());
+        Blob b = ps.getValue(Type.BINARY);
+        assertEquals(blobSize, b.length());
+
+        byte[] testData = new byte[blobSize];
+        ByteStreams.readFully(b.getNewStream(), testData);
+        assertArrayEquals(data, testData);
+
+    }
+
+    private static byte[] addTestContent(NodeStore store, String child, int size)
+            throws CommitFailedException, IOException {
+        NodeBuilder builder = store.getRoot().builder();
+        builder.child(child).setProperty("ts", System.currentTimeMillis());
+
+        byte[] data = new byte[size];
+        new Random().nextBytes(data);
+        Blob blob = store.createBlob(new ByteArrayInputStream(data));
+        builder.child(child).setProperty("testBlob", blob);
+
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        return data;
     }
 
     public static void main(String[] args) throws Exception {

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/TestBase.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/TestBase.java?rev=1637815&r1=1637814&r2=1637815&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/TestBase.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/TestBase.java Mon Nov 10 10:55:02 2014
@@ -1,84 +1,104 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.jackrabbit.oak.plugins.segment.standby;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.SystemUtils;
-import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
-
-public class TestBase {
-    int port = Integer.valueOf(System.getProperty("standby.server.port", "52808"));
-    final static String LOCALHOST = "127.0.0.1";
-
-    File directoryS;
-    FileStore storeS;
-
-    File directoryC;
-    FileStore storeC;
-
-    File directoryC2;
-    FileStore storeC2;
-
-    /*
-     Java 6 on Windows doesn't support dual IP stacks, so we will skip our IPv6
-     tests.
-    */
-    protected final boolean noDualStackSupport = SystemUtils.IS_OS_WINDOWS && SystemUtils.IS_JAVA_1_6;
-
-    public void setUpServerAndClient() throws IOException {
-        // server
-        directoryS = createTmpTargetDir("FailoverServerTest");
-        storeS = new FileStore(directoryS, 1, false);
-
-        // client
-        directoryC = createTmpTargetDir("FailoverClientTest");
-        storeC = new FileStore(directoryC, 1, false);
-    }
-
-    public void setUpServerAndTwoClients() throws Exception {
-        setUpServerAndClient();
-
-        directoryC2 = createTmpTargetDir("FailoverClient2Test");
-        storeC2 = new FileStore(directoryC2, 1, false);
-    }
-
-    public void closeServerAndClient() {
-        storeS.close();
-        storeC.close();
-        try {
-            FileUtils.deleteDirectory(directoryS);
-            FileUtils.deleteDirectory(directoryC);
-        } catch (IOException e) {
-        }
-    }
-
-    public void closeServerAndTwoClients() {
-        closeServerAndClient();
-        storeC2.close();
-        try {
-            FileUtils.deleteDirectory(directoryC2);
-        } catch (IOException e) {
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.standby;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
+
+public class TestBase {
+    int port = Integer.valueOf(System.getProperty("standby.server.port", "52808"));
+    final static String LOCALHOST = "127.0.0.1";
+
+    File directoryS;
+    FileStore storeS;
+
+    File directoryC;
+    FileStore storeC;
+
+    File directoryC2;
+    FileStore storeC2;
+
+    /*
+     Java 6 on Windows doesn't support dual IP stacks, so we will skip our IPv6
+     tests.
+    */
+    protected final boolean noDualStackSupport = SystemUtils.IS_OS_WINDOWS && SystemUtils.IS_JAVA_1_6;
+
+    public void setUpServerAndClient() throws IOException {
+        // server
+        directoryS = createTmpTargetDir("FailoverServerTest");
+        storeS = setupPrimary(directoryS);
+
+        // client
+        directoryC = createTmpTargetDir("FailoverClientTest");
+        storeC = setupSecondary(directoryC);
+    }
+
+    protected FileStore setupPrimary(File directory) throws IOException {
+        return new FileStore(directory, 1, false);
+    }
+
+    protected FileStore getPrimary() {
+        return storeS;
+    }
+
+    protected FileStore setupSecondary(File directory) throws IOException {
+        return new FileStore(directoryC, 1, false);
+    }
+
+    protected FileStore getSecondary() {
+        return storeC;
+    }
+
+    protected int getPort() {
+        return port;
+    }
+
+    public void setUpServerAndTwoClients() throws Exception {
+        setUpServerAndClient();
+
+        directoryC2 = createTmpTargetDir("FailoverClient2Test");
+        storeC2 = new FileStore(directoryC2, 1, false);
+    }
+
+    public void closeServerAndClient() {
+        storeS.close();
+        storeC.close();
+        try {
+            FileUtils.deleteDirectory(directoryS);
+            FileUtils.deleteDirectory(directoryC);
+        } catch (IOException e) {
+        }
+    }
+
+    public void closeServerAndTwoClients() {
+        closeServerAndClient();
+        storeC2.close();
+        try {
+            FileUtils.deleteDirectory(directoryC2);
+        } catch (IOException e) {
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/TestBase.java
------------------------------------------------------------------------------
    svn:eol-style = native