You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/04/05 07:02:15 UTC

[GitHub] sijie closed pull request #1298: newOpenLedgerOp and ReadHandle implementation for MockBookKeeper

sijie closed pull request #1298: newOpenLedgerOp and ReadHandle implementation for MockBookKeeper
URL: https://github.com/apache/bookkeeper/pull/1298
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index a9a85bc03..2d3446080 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -120,6 +120,12 @@
      */
     public static final long INVALID_ENTRY_ID = BookieProtocol.INVALID_ENTRY_ID;
 
+    /**
+     * Invalid ledger id. Ledger IDs must be greater than or equal to 0.
+     * Large negative used to make it easy to spot in logs if erroneously used.
+     */
+    public static final long INVALID_LEDGER_ID = -0xABCDABCDL;
+
     final AtomicInteger blockAddCompletions = new AtomicInteger(0);
     final AtomicInteger numEnsembleChanges = new AtomicInteger(0);
     final int maxAllowedEnsembleChanges;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 33cfaf266..c3a69bd32 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -33,8 +33,8 @@
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncOpenCallback;
-import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.OpenBuilderBase;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
@@ -225,43 +225,14 @@ void openComplete(int rc, LedgerHandle lh) {
         cb.openComplete(rc, lh, ctx);
     }
 
-    static final class OpenBuilderImpl implements OpenBuilder {
+    static final class OpenBuilderImpl extends OpenBuilderBase {
 
-        private boolean builderRecovery = false;
-        private Long builderLedgerId;
-        private byte[] builderPassword;
-        private org.apache.bookkeeper.client.api.DigestType builderDigestType =
-            org.apache.bookkeeper.client.api.DigestType.CRC32;
         private final BookKeeper bk;
 
         OpenBuilderImpl(BookKeeper bookkeeper) {
             this.bk = bookkeeper;
         }
 
-        @Override
-        public OpenBuilder withLedgerId(long ledgerId) {
-            this.builderLedgerId = ledgerId;
-            return this;
-        }
-
-        @Override
-        public OpenBuilder withRecovery(boolean recovery) {
-            this.builderRecovery = recovery;
-            return this;
-        }
-
-        @Override
-        public OpenBuilder withPassword(byte[] password) {
-            this.builderPassword = password;
-            return this;
-        }
-
-        @Override
-        public OpenBuilder withDigestType(org.apache.bookkeeper.client.api.DigestType digestType) {
-            this.builderDigestType = digestType;
-            return this;
-        }
-
         @Override
         public CompletableFuture<ReadHandle> execute() {
             CompletableFuture<ReadHandle> future = new CompletableFuture<>();
@@ -270,23 +241,14 @@ public OpenBuilder withDigestType(org.apache.bookkeeper.client.api.DigestType di
             return future;
         }
 
-        private boolean validate() {
-            if (builderLedgerId == null || builderLedgerId < 0) {
-                LOG.error("invalid ledgerId {} < 0", builderLedgerId);
-                return false;
-            }
-            return true;
-        }
-
         private void open(OpenCallback cb) {
-
             if (!validate()) {
                 cb.openComplete(BKException.Code.NoSuchLedgerExistsException, null, null);
                 return;
             }
 
-            LedgerOpenOp op = new LedgerOpenOp(bk, builderLedgerId, fromApiDigestType(builderDigestType),
-                builderPassword, cb, null);
+            LedgerOpenOp op = new LedgerOpenOp(bk, ledgerId, fromApiDigestType(digestType),
+                                               password, cb, null);
             ReentrantReadWriteLock closeLock = bk.getCloseLock();
             closeLock.readLock().lock();
             try {
@@ -294,7 +256,7 @@ private void open(OpenCallback cb) {
                     cb.openComplete(BKException.Code.ClientClosedException, null, null);
                     return;
                 }
-                if (builderRecovery) {
+                if (recovery) {
                     op.initiate();
                 } else {
                     op.initiateWithoutRecovery();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/OpenBuilderBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/OpenBuilderBase.java
new file mode 100644
index 000000000..b22effc3d
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/OpenBuilderBase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.client.impl;
+
+import java.util.Arrays;
+
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.OpenBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for open builders which does the mundane builder stuff.
+ */
+public abstract class OpenBuilderBase implements OpenBuilder {
+    static final Logger LOG = LoggerFactory.getLogger(OpenBuilderBase.class);
+
+    protected boolean recovery = false;
+    protected long ledgerId = LedgerHandle.INVALID_LEDGER_ID;
+    protected byte[] password;
+    protected DigestType digestType = DigestType.CRC32;
+
+    @Override
+    public OpenBuilder withLedgerId(long ledgerId) {
+        this.ledgerId = ledgerId;
+        return this;
+    }
+
+    @Override
+    public OpenBuilder withRecovery(boolean recovery) {
+        this.recovery = recovery;
+        return this;
+    }
+
+    @Override
+    public OpenBuilder withPassword(byte[] password) {
+        this.password = Arrays.copyOf(password, password.length);
+        return this;
+    }
+
+    @Override
+    public OpenBuilder withDigestType(DigestType digestType) {
+        this.digestType = digestType;
+        return this;
+    }
+
+    protected boolean validate() {
+        if (ledgerId < 0) {
+            LOG.error("invalid ledgerId {} < 0", ledgerId);
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
index c642e57a7..3b63cfd37 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
@@ -24,6 +24,7 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -34,6 +35,9 @@
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.OpenBuilderBase;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -210,6 +214,42 @@ public void close() throws InterruptedException, BKException {
         shutdown();
     }
 
+    @Override
+    public OpenBuilder newOpenLedgerOp() {
+        return new OpenBuilderBase() {
+            @Override
+            public CompletableFuture<ReadHandle> execute() {
+                CompletableFuture<ReadHandle> promise = new CompletableFuture<ReadHandle>();
+
+                if (!validate()) {
+                    promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsException());
+                    return promise;
+                } else if (getProgrammedFailStatus()) {
+                    if (failReturnCode != BkTimeoutOperation) {
+                        promise.completeExceptionally(BKException.create(failReturnCode));
+                    }
+                    return promise;
+                } else if (stopped.get()) {
+                    promise.completeExceptionally(new BKException.BKClientClosedException());
+                    return promise;
+                }
+
+                MockLedgerHandle lh = ledgers.get(ledgerId);
+                if (lh == null) {
+                    promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsException());
+                } else if (lh.digest != DigestType.fromApiDigestType(digestType)) {
+                    promise.completeExceptionally(new BKException.BKDigestMatchException());
+                } else if (!Arrays.equals(lh.passwd, password)) {
+                    promise.completeExceptionally(new BKException.BKUnauthorizedAccessException());
+                } else {
+                    promise.complete(new MockReadHandle(MockBookKeeper.this, ledgerId,
+                                                        lh.getLedgerMetadata(), lh.entries));
+                }
+                return promise;
+            }
+        };
+    }
+
     public void shutdown() {
         try {
             super.close();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
index c914b1031..6b4bea4d9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
@@ -30,13 +30,18 @@
 import java.util.EnumSet;
 import java.util.Enumeration;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,11 +50,12 @@
  */
 public class MockLedgerHandle extends LedgerHandle {
 
-    final ArrayList<LedgerEntry> entries = Lists.newArrayList();
+    final ArrayList<LedgerEntryImpl> entries = Lists.newArrayList();
     final MockBookKeeper bk;
     final long id;
     final DigestType digest;
     final byte[] passwd;
+    final ReadHandle readHandle;
     long lastEntry = -1;
     boolean fenced = false;
 
@@ -60,6 +66,8 @@
         this.id = id;
         this.digest = digest;
         this.passwd = Arrays.copyOf(passwd, passwd.length);
+
+        readHandle = new MockReadHandle(bk, id, getLedgerMetadata(), entries);
     }
 
     @Override
@@ -100,7 +108,7 @@ public void run() {
                 final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>();
                 long entryId = firstEntry;
                 while (entryId <= lastEntry && entryId < entries.size()) {
-                    seq.add(entries.get((int) entryId++));
+                    seq.add(new LedgerEntry(entries.get((int) entryId++)));
                 }
 
                 log.debug("Entries read: {}", seq);
@@ -142,7 +150,7 @@ public long addEntry(byte[] data) throws InterruptedException, BKException {
         }
 
         lastEntry = entries.size();
-        entries.add(new MockLedgerEntry(ledgerId, lastEntry, data));
+        entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length, Unpooled.wrappedBuffer(data)));
         return lastEntry;
     }
 
@@ -192,8 +200,8 @@ public void run() {
                     lastEntry = entries.size();
                     byte[] storedData = new byte[data.readableBytes()];
                     data.readBytes(storedData);
-                    LedgerEntry entry = new MockLedgerEntry(ledgerId, lastEntry, storedData);
-                    entries.add(entry);
+                    entries.add(LedgerEntryImpl.create(ledgerId, lastEntry,
+                                                       storedData.length, Unpooled.wrappedBuffer(storedData)));
                     data.release();
                     cb.addComplete(0, MockLedgerHandle.this, lastEntry, ctx);
                 }
@@ -214,13 +222,47 @@ public long getLastAddConfirmed() {
     @Override
     public long getLength() {
         long length = 0;
-        for (LedgerEntry entry : entries) {
+        for (LedgerEntryImpl entry : entries) {
             length += entry.getLength();
         }
 
         return length;
     }
 
+
+    // ReadHandle interface
+    @Override
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
+        return readHandle.readAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
+        return readHandle.readUnconfirmedAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<Long> readLastAddConfirmedAsync() {
+        return readHandle.readLastAddConfirmedAsync();
+    }
+
+    @Override
+    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+        return readHandle.tryReadLastAddConfirmedAsync();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return readHandle.isClosed();
+    }
+
+    @Override
+    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                                      long timeOutInMillis,
+                                                                                      boolean parallel) {
+        return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(MockLedgerHandle.class);
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
new file mode 100644
index 000000000..e3e3ffeda
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+
+
+/**
+ * Mock implementation of ReadHandle.
+ */
+@Slf4j
+class MockReadHandle implements ReadHandle {
+    private final MockBookKeeper bk;
+    private final long ledgerId;
+    private final LedgerMetadata metadata;
+    private final List<LedgerEntryImpl> entries;
+
+    MockReadHandle(MockBookKeeper bk, long ledgerId, LedgerMetadata metadata, List<LedgerEntryImpl> entries) {
+        this.bk = bk;
+        this.ledgerId = ledgerId;
+        this.metadata = metadata;
+        this.entries = entries;
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
+        CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+        if (bk.isStopped()) {
+            promise.completeExceptionally(new BKException.BKClientClosedException());
+            return promise;
+        }
+
+        bk.executor.execute(() -> {
+                if (bk.getProgrammedFailStatus()) {
+                    promise.completeExceptionally(BKException.create(bk.failReturnCode));
+                    return;
+                } else if (bk.isStopped()) {
+                    promise.completeExceptionally(new BKException.BKClientClosedException());
+                    return;
+                }
+
+                log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size());
+                List<LedgerEntry> seq = new ArrayList<>();
+                long entryId = firstEntry;
+                while (entryId <= lastEntry && entryId < entries.size()) {
+                    seq.add(entries.get((int) entryId++).duplicate());
+                }
+                log.debug("Entries read: {}", seq);
+                promise.complete(LedgerEntriesImpl.create(seq));
+            });
+        return promise;
+
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
+        return readAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<Long> readLastAddConfirmedAsync() {
+        return CompletableFuture.completedFuture(getLastAddConfirmed());
+    }
+
+    @Override
+    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+        return readLastAddConfirmedAsync();
+    }
+
+    @Override
+    public long getLastAddConfirmed() {
+        return entries.get(entries.size() - 1).getEntryId();
+    }
+
+    @Override
+    public long getLength() {
+        long length = 0;
+        for (LedgerEntryImpl entry : entries) {
+            length += entry.getLength();
+        }
+
+        return length;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return metadata.isClosed();
+    }
+
+    @Override
+    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                                      long timeOutInMillis,
+                                                                                      boolean parallel) {
+        CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>();
+        promise.completeExceptionally(new UnsupportedOperationException("Long poll not implemented"));
+        return promise;
+    }
+
+    // Handle interface
+    @Override
+    public long getId() {
+        return ledgerId;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public LedgerMetadata getLedgerMetadata() {
+        return metadata;
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services