You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2018/08/24 12:36:39 UTC

[bookkeeper] branch master updated: Add mock registration client

This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3672a28  Add mock registration client
3672a28 is described below

commit 3672a280e9b1f776514cba7066e5100aa7a32d08
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Fri Aug 24 14:36:31 2018 +0200

    Add mock registration client
    
    For testing the client without having to boot zookeeper.
    
    Author: Ivan Kelly <iv...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>
    
    This closes #1622 from ivankelly/mock-reg-client
---
 .../discover/MockRegistrationClient.java           | 157 +++++++++++++++++++++
 1 file changed, 157 insertions(+)

diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/MockRegistrationClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/MockRegistrationClient.java
new file mode 100644
index 0000000..40178b9
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/MockRegistrationClient.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.discover;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Versioned;
+
+/**
+ * Mock implementation of registration client.
+ * All actions take place in a single thread executor, so they are async
+ * w.r.t. the caller.
+ */
+public class MockRegistrationClient implements RegistrationClient {
+    final ExecutorService executor;
+    private long currentVersion = 0;
+    private Set<BookieSocketAddress> bookies = new HashSet<BookieSocketAddress>();
+    private Set<BookieSocketAddress> readOnlyBookies = new HashSet<BookieSocketAddress>();
+    private Set<RegistrationListener> bookieWatchers = new HashSet<RegistrationListener>();
+    private Set<RegistrationListener> readOnlyBookieWatchers = new HashSet<RegistrationListener>();
+
+    public MockRegistrationClient() {
+        this.executor = Executors.newSingleThreadExecutor((r) -> new Thread(r, "MockRegistrationClient"));
+    }
+
+    @Override
+    public void close() {
+        executor.shutdownNow();
+    }
+
+    private static Versioned<Set<BookieSocketAddress>> versioned(Set<BookieSocketAddress> bookies, long version) {
+        return new Versioned<>(Collections.unmodifiableSet(bookies), new LongVersion(version));
+    }
+
+    CompletableFuture<Void> addBookies(BookieSocketAddress... bookies) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        executor.submit(() -> {
+                currentVersion++;
+                for (BookieSocketAddress b : bookies) {
+                    this.bookies.add(b);
+                }
+                bookieWatchers.forEach(w -> w.onBookiesChanged(versioned(this.bookies, currentVersion)));
+                promise.complete(null);
+            });
+        return promise;
+    }
+
+    CompletableFuture<Void> removeBookies(BookieSocketAddress... bookies) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        executor.submit(() -> {
+                currentVersion++;
+                for (BookieSocketAddress b : bookies) {
+                    this.bookies.add(b);
+                }
+                bookieWatchers.forEach(w -> w.onBookiesChanged(versioned(this.bookies, currentVersion)));
+                promise.complete(null);
+            });
+        return promise;
+    }
+
+    CompletableFuture<Void> addReadOnlyBookies(BookieSocketAddress... bookies) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        executor.submit(() -> {
+                currentVersion++;
+                for (BookieSocketAddress b : bookies) {
+                    this.readOnlyBookies.add(b);
+                }
+                readOnlyBookieWatchers.forEach(w -> w.onBookiesChanged(versioned(readOnlyBookies, currentVersion)));
+                promise.complete(null);
+            });
+        return promise;
+    }
+
+    CompletableFuture<Void> removeReadOnlyBookies(BookieSocketAddress... bookies) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        executor.submit(() -> {
+                currentVersion++;
+                for (BookieSocketAddress b : bookies) {
+                    this.readOnlyBookies.add(b);
+                }
+                readOnlyBookieWatchers.forEach(w -> w.onBookiesChanged(versioned(readOnlyBookies, currentVersion)));
+                promise.complete(null);
+            });
+        return promise;
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Set<BookieSocketAddress>>> getWritableBookies() {
+        CompletableFuture<Versioned<Set<BookieSocketAddress>>> promise = new CompletableFuture<>();
+        executor.submit(() -> promise.complete(versioned(bookies, currentVersion)));
+        return promise;
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Set<BookieSocketAddress>>> getReadOnlyBookies() {
+        CompletableFuture<Versioned<Set<BookieSocketAddress>>> promise = new CompletableFuture<>();
+        executor.submit(() -> promise.complete(versioned(readOnlyBookies, currentVersion)));
+        return promise;
+    }
+
+    @Override
+    public CompletableFuture<Void> watchWritableBookies(RegistrationListener listener) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        executor.submit(() -> {
+                bookieWatchers.add(listener);
+                promise.complete(null);
+            });
+        return promise;
+    }
+
+    @Override
+    public void unwatchWritableBookies(RegistrationListener listener) {
+        executor.submit(() -> {
+                bookieWatchers.remove(listener);
+            });
+    }
+
+    @Override
+    public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener listener) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        executor.submit(() -> {
+                readOnlyBookieWatchers.add(listener);
+                promise.complete(null);
+            });
+        return promise;
+    }
+
+    @Override
+    public void unwatchReadOnlyBookies(RegistrationListener listener) {
+        executor.submit(() -> {
+                readOnlyBookieWatchers.remove(listener);
+            });
+    }
+}