You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/09/10 18:27:49 UTC
bookkeeper git commit: BOOKKEEPER-930: Option to disable Bookie
networking
Repository: bookkeeper
Updated Branches:
refs/heads/master 28f23e80b -> 9db51b8d5
BOOKKEEPER-930: Option to disable Bookie networking
Author: eolivelli <eo...@gmail.com>
Reviewers: sijie@apache.org <si...@apache.org>
Closes #49 from eolivelli/BOOKKEEPER-930
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/9db51b8d
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/9db51b8d
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/9db51b8d
Branch: refs/heads/master
Commit: 9db51b8d532d18485798d9dd96973c22450a0495
Parents: 28f23e8
Author: eolivelli <eo...@gmail.com>
Authored: Sat Sep 10 11:27:43 2016 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Sat Sep 10 11:27:43 2016 -0700
----------------------------------------------------------------------
.../bookkeeper/conf/ServerConfiguration.java | 25 ++++-
.../bookkeeper/proto/BookieNettyServer.java | 98 +++++++-------------
.../apache/bookkeeper/proto/ChannelManager.java | 44 +++++++++
.../bookkeeper/proto/LocalBookiesRegistry.java | 4 +-
.../proto/NioServerSocketChannelManager.java | 73 +++++++++++++++
.../proto/PerChannelBookieClient.java | 4 +-
.../bookkeeper/proto/VMLocalChannelManager.java | 61 ++++++++++++
.../bookkeeper/proto/NetworkLessBookieTest.java | 79 ++++++++++++++++
8 files changed, 318 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 7d9b697..67e81cc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -114,6 +114,7 @@ public class ServerConfiguration extends AbstractConfiguration {
// registration.
protected final static String USE_HOST_NAME_AS_BOOKIE_ID = "useHostNameAsBookieID";
protected final static String ENABLE_LOCAL_TRANSPORT = "enableLocalTransport";
+ protected final static String DISABLE_SERVER_SOCKET_BIND = "disableServerSocketBind";
protected final static String SORTED_LEDGER_STORAGE_ENABLED = "sortedLedgerStorageEnabled";
protected final static String SKIP_LIST_SIZE_LIMIT = "skipListSizeLimit";
@@ -1561,7 +1562,7 @@ public class ServerConfiguration extends AbstractConfiguration {
}
/**
- * Get hwhether to use listen for local JVM clients. Defaults to false.
+ * Get whether to listen for local JVM clients. Defaults to false.
*
* @return true, then bookie will be listen for local JVM clients
*/
@@ -1582,6 +1583,28 @@ public class ServerConfiguration extends AbstractConfiguration {
return this;
}
+ /**
+ * Get whether to disable bind of server-side sockets. Defaults to false.
+ *
+ * @return true, then bookie will not listen for network connections
+ */
+ public boolean isDisableServerSocketBind() {
+ return getBoolean(DISABLE_SERVER_SOCKET_BIND, false);
+ }
+
+ /**
+ * Configure the bookie to disable bind on network interfaces,
+ * this bookie will be available only to BookKeeper clients executed on the local JVM
+ *
+ * @see #getDisableServerSocketBind
+ * @param disableServerSocketBind
+ * whether to disable binding on network interfaces
+ * @return server configuration
+ */
+ public ServerConfiguration setDisableServerSocketBind(boolean disableServerSocketBind) {
+ setProperty(DISABLE_SERVER_SOCKET_BIND, disableServerSocketBind);
+ return this;
+ }
/**
* Get the stats provider used by bookie.
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index 202a5e5..5fcc64e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -21,20 +21,14 @@
package org.apache.bookkeeper.proto;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.auth.BookieAuthProvider;
import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.zookeeper.KeeperException;
-import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -44,18 +38,14 @@ import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import com.google.protobuf.ExtensionRegistry;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
-import org.jboss.netty.channel.local.LocalAddress;
+import java.util.ArrayList;
+import java.util.List;
/**
* Netty server for serving bookie requests
@@ -65,21 +55,19 @@ class BookieNettyServer {
final static int maxMessageSize = 0xfffff;
final ServerConfiguration conf;
- final ChannelFactory serverChannelFactory;
- final ChannelFactory jvmServerChannelFactory;
+ final List<ChannelManager> channels = new ArrayList<>();
final RequestProcessor requestProcessor;
final ChannelGroup allChannels = new CleanupChannelGroup();
final AtomicBoolean isRunning = new AtomicBoolean(false);
- Object suspensionLock = new Object();
+ final Object suspensionLock = new Object();
boolean suspended = false;
- final BookieSocketAddress bookieAddress;
final BookieAuthProvider.Factory authProviderFactory;
final BookieProtoEncoding.ResponseEncoder responseEncoder;
final BookieProtoEncoding.RequestDecoder requestDecoder;
BookieNettyServer(ServerConfiguration conf, RequestProcessor processor)
- throws IOException, KeeperException, InterruptedException, BookieException {
+ throws IOException, KeeperException, InterruptedException, BookieException {
this.conf = conf;
this.requestProcessor = processor;
@@ -89,25 +77,25 @@ class BookieNettyServer {
responseEncoder = new BookieProtoEncoding.ResponseEncoder(registry);
requestDecoder = new BookieProtoEncoding.RequestDecoder(registry);
- ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
- String base = "bookie-" + conf.getBookiePort() + "-netty";
- serverChannelFactory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(tfb.setNameFormat(base + "-boss-%d").build()),
- Executors.newCachedThreadPool(tfb.setNameFormat(base + "-worker-%d").build()));
+ if (!conf.isDisableServerSocketBind()) {
+ channels.add(new NioServerSocketChannelManager());
+ }
if (conf.isEnableLocalTransport()) {
- jvmServerChannelFactory = new DefaultLocalServerChannelFactory();
- } else {
- jvmServerChannelFactory = null;
+ channels.add(new VMLocalChannelManager());
}
- bookieAddress = Bookie.getBookieAddress(conf);
- InetSocketAddress bindAddress;
- if (conf.getListeningInterface() == null) {
- // listen on all interfaces
- bindAddress = new InetSocketAddress(conf.getBookiePort());
- } else {
- bindAddress = bookieAddress.getSocketAddress();
+ try {
+ for (ChannelManager channel : channels) {
+ Channel nettyChannel = channel.start(conf, new BookiePipelineFactory());
+ allChannels.add(nettyChannel);
+ }
+ } catch (IOException bindError) {
+ // clean up all the channels, if this constructor throws an exception the caller code will
+ // not be able to call close(), leading to a resource leak
+ for (ChannelManager channel : channels) {
+ channel.close();
+ }
+ throw bindError;
}
- listenOn(bindAddress, bookieAddress);
}
boolean isRunning() {
@@ -131,44 +119,21 @@ class BookieNettyServer {
}
}
- private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddress) {
- ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
- bootstrap.setPipelineFactory(new BookiePipelineFactory());
- bootstrap.setOption("child.tcpNoDelay", conf.getServerTcpNoDelay());
- bootstrap.setOption("child.soLinger", 2);
-
- Channel listen = bootstrap.bind(address);
- allChannels.add(listen);
-
- if (conf.isEnableLocalTransport()) {
- ServerBootstrap jvmbootstrap = new ServerBootstrap(jvmServerChannelFactory);
- jvmbootstrap.setPipelineFactory(new BookiePipelineFactory());
-
- // use the same address 'name', so clients can find local Bookie still discovering them using ZK
- Channel jvmlisten = jvmbootstrap.bind(bookieAddress.getLocalAddress());
- allChannels.add(jvmlisten);
- LocalBookiesRegistry.registerLocalBookieAddress(bookieAddress);
- }
- }
-
void start() {
isRunning.set(true);
}
void shutdown() {
LOG.info("Shutting down BookieNettyServer");
- if (conf.isEnableLocalTransport()) {
- LocalBookiesRegistry.unregisterLocalBookieAddress(bookieAddress);
- }
isRunning.set(false);
allChannels.close().awaitUninterruptibly();
- serverChannelFactory.releaseExternalResources();
- if (conf.isEnableLocalTransport()) {
- jvmServerChannelFactory.releaseExternalResources();
+ for (ChannelManager channel : channels) {
+ channel.close();
}
}
- private class BookiePipelineFactory implements ChannelPipelineFactory {
+ class BookiePipelineFactory implements ChannelPipelineFactory {
+
public ChannelPipeline getPipeline() throws Exception {
synchronized (suspensionLock) {
while (suspended) {
@@ -177,16 +142,16 @@ class BookieNettyServer {
}
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("lengthbaseddecoder",
- new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
+ new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
pipeline.addLast("bookieProtoDecoder", requestDecoder);
pipeline.addLast("bookieProtoEncoder", responseEncoder);
pipeline.addLast("bookieAuthHandler",
- new AuthHandler.ServerSideHandler(authProviderFactory));
+ new AuthHandler.ServerSideHandler(authProviderFactory));
- SimpleChannelHandler requestHandler = isRunning.get() ?
- new BookieRequestHandler(conf, requestProcessor, allChannels)
+ SimpleChannelHandler requestHandler = isRunning.get()
+ ? new BookieRequestHandler(conf, requestProcessor, allChannels)
: new RejectRequestHandler();
pipeline.addLast("bookieRequestHandler", requestHandler);
@@ -204,6 +169,7 @@ class BookieNettyServer {
}
private static class CleanupChannelGroup extends DefaultChannelGroup {
+
private AtomicBoolean closed = new AtomicBoolean(false);
CleanupChannelGroup() {
@@ -230,9 +196,9 @@ class BookieNettyServer {
if (!(o instanceof CleanupChannelGroup)) {
return false;
}
- CleanupChannelGroup other = (CleanupChannelGroup)o;
+ CleanupChannelGroup other = (CleanupChannelGroup) o;
return other.closed.get() == closed.get()
- && super.equals(other);
+ && super.equals(other);
}
@Override
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java
new file mode 100644
index 0000000..15f00db
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+
+/**
+ * Manages the lifycycle of a communication Channel
+ * @author enrico.olivelli
+ */
+public abstract class ChannelManager {
+
+ /**
+ * Boots the Channel
+ * @param conf Bookie Configuration
+ * @param channelPipelineFactory Netty Pipeline Factory
+ * @param bookieAddress The actual address to listen on
+ * @return the channel which is listening for incoming connections
+ * @throws IOException
+ */
+ public abstract Channel start(ServerConfiguration conf, ChannelPipelineFactory channelPipelineFactory) throws IOException;
+
+ /**
+ * Releases all resources
+ */
+ public abstract void close();
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
index f123aa6..0dd2aa3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
@@ -35,7 +35,9 @@ class LocalBookiesRegistry {
localBookiesRegistry.put(address,Boolean.TRUE);
}
static void unregisterLocalBookieAddress(BookieSocketAddress address) {
- localBookiesRegistry.remove(address);
+ if (address!= null) {
+ localBookiesRegistry.remove(address);
+ }
}
static boolean isLocalBookie(BookieSocketAddress address) {
return localBookiesRegistry.containsKey(address);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java
new file mode 100644
index 0000000..925d677
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+/**
+ * Manages a NioServerSocketChannel channel
+ *
+ * @author enrico.olivelli
+ */
+public class NioServerSocketChannelManager extends ChannelManager {
+
+ private ChannelFactory channelFactory;
+
+ @Override
+ public Channel start(ServerConfiguration conf, ChannelPipelineFactory bookiePipelineFactory) throws IOException {
+ BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf);
+ ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+ String base = "bookie-" + conf.getBookiePort() + "-netty";
+ this.channelFactory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(tfb.setNameFormat(base + "-boss-%d").build()),
+ Executors.newCachedThreadPool(tfb.setNameFormat(base + "-worker-%d").build()));
+
+ ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
+ bootstrap.setPipelineFactory(bookiePipelineFactory);
+ bootstrap.setOption("child.tcpNoDelay", conf.getServerTcpNoDelay());
+ bootstrap.setOption("child.soLinger", 2);
+
+ InetSocketAddress bindAddress;
+ if (conf.getListeningInterface() == null) {
+ // listen on all interfaces
+ bindAddress = new InetSocketAddress(conf.getBookiePort());
+ } else {
+ bindAddress = bookieAddress.getSocketAddress();
+ }
+
+ Channel listen = bootstrap.bind(bindAddress);
+ return listen;
+ }
+
+ @Override
+ public void close() {
+ if (channelFactory != null) {
+ channelFactory.releaseExternalResources();
+ }
+ channelFactory = null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 62f55ea..21b16fb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -588,7 +588,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
public void safeRun() {
String bAddress = "null";
Channel c = channel;
- if (c != null) {
+ if (c != null && c.getRemoteAddress() != null) {
bAddress = c.getRemoteAddress().toString();
}
@@ -620,7 +620,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
public void safeRun() {
String bAddress = "null";
Channel c = channel;
- if(c != null) {
+ if(c != null && c.getRemoteAddress() != null) {
bAddress = c.getRemoteAddress().toString();
}
LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {} rc: {}",
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java
new file mode 100644
index 0000000..03881b5
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
+
+/**
+ * Manages VM-local channels
+ *
+ * @author enrico.olivelli
+ */
+public class VMLocalChannelManager extends ChannelManager {
+
+ private ChannelFactory channelFactory;
+ private BookieSocketAddress bookieAddress;
+
+ @Override
+ public Channel start(ServerConfiguration conf, ChannelPipelineFactory bookiePipelineFactory) throws IOException {
+ BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf);
+ this.channelFactory = new DefaultLocalServerChannelFactory();
+ this.bookieAddress = bookieAddress;
+ ServerBootstrap jvmbootstrap = new ServerBootstrap(channelFactory);
+ jvmbootstrap.setPipelineFactory(bookiePipelineFactory);
+
+ // use the same address 'name', so clients can find local Bookie still discovering them using ZK
+ Channel jvmlisten = jvmbootstrap.bind(bookieAddress.getLocalAddress());
+ LocalBookiesRegistry.registerLocalBookieAddress(bookieAddress);
+ return jvmlisten;
+ }
+
+ @Override
+ public void close() {
+ LocalBookiesRegistry.unregisterLocalBookieAddress(bookieAddress);
+ if (channelFactory != null) {
+ channelFactory.releaseExternalResources();
+ }
+ channelFactory = null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
new file mode 100644
index 0000000..5a1f7fc
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
@@ -0,0 +1,79 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.bookkeeper.proto;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.bookkeeper.client.BookKeeper;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BaseTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests of the main BookKeeper client using networkless comunication
+ */
+public class NetworkLessBookieTest extends BaseTestCase {
+
+ protected ServerConfiguration newServerConfiguration() throws Exception {
+ return super
+ .newServerConfiguration()
+ .setDisableServerSocketBind(true)
+ .setEnableLocalTransport(true);
+ }
+
+ DigestType digestType;
+
+ public NetworkLessBookieTest(DigestType digestType) {
+ super(4);
+ this.digestType=digestType;
+ }
+
+ @Test
+ public void testUseLocalBookie() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration()
+ .setZkServers(zkUtil.getZooKeeperConnectString())
+ .setZkTimeout(20000);
+
+ CountDownLatch l = new CountDownLatch(1);
+ zkUtil.sleepServer(5, l);
+ l.await();
+
+ try (BookKeeper bkc = new BookKeeper(conf);) {
+ try (LedgerHandle h = bkc.createLedger(1,1,digestType, "testPasswd".getBytes());) {
+ h.addEntry("test".getBytes());
+ }
+ }
+
+ for (BookieServer bk : bs) {
+ for (ChannelManager channel : bk.nettyServer.channels) {
+ if (! (channel instanceof VMLocalChannelManager)) {
+ Assert.fail();
+ }
+ }
+ }
+ }
+
+}