You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2016/04/05 18:47:36 UTC
bookkeeper git commit: BOOKKEEPER-896: VM-local transport
Repository: bookkeeper
Updated Branches:
refs/heads/master d607b366b -> e5939ed58
BOOKKEEPER-896: VM-local transport
Author: eolivelli <eo...@gmail.com>
Reviewers: Sijie Guo <si...@apache.org>, Matteo Merli <mm...@apache.org>
Closes #21 from eolivelli/BOOKKEEPER-896
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/e5939ed5
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/e5939ed5
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/e5939ed5
Branch: refs/heads/master
Commit: e5939ed587d81212fb032a37b2b83401359fa7fc
Parents: d607b36
Author: eolivelli <eo...@gmail.com>
Authored: Tue Apr 5 09:47:07 2016 -0700
Committer: Matteo Merli <mm...@apache.org>
Committed: Tue Apr 5 09:47:07 2016 -0700
----------------------------------------------------------------------
.../bookkeeper/conf/ServerConfiguration.java | 24 +++++++
.../bookkeeper/net/BookieSocketAddress.java | 8 +++
.../bookkeeper/proto/BookieNettyServer.java | 33 +++++++++-
.../bookkeeper/proto/LocalBookiesRegistry.java | 44 +++++++++++++
.../proto/PerChannelBookieClient.java | 20 ++++--
.../bookkeeper/client/LocalBookKeeperTest.java | 67 ++++++++++++++++++++
6 files changed, 189 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e5939ed5/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index d770650..686104d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -110,6 +110,7 @@ public class ServerConfiguration extends AbstractConfiguration {
// Whether the bookie should use its hostname or ipaddress for the
// registration.
protected final static String USE_HOST_NAME_AS_BOOKIE_ID = "useHostNameAsBookieID";
+ protected final static String ENABLE_LOCAL_TRANSPORT = "enableLocalTransport";
protected final static String SORTED_LEDGER_STORAGE_ENABLED = "sortedLedgerStorageEnabled";
protected final static String SKIP_LIST_SIZE_LIMIT = "skipListSizeLimit";
@@ -1525,6 +1526,29 @@ public class ServerConfiguration extends AbstractConfiguration {
}
/**
+ * Get hwhether to use listen for local JVM clients. Defaults to false.
+ *
+ * @return true, then bookie will be listen for local JVM clients
+ */
+ public boolean isEnableLocalTransport() {
+ return getBoolean(ENABLE_LOCAL_TRANSPORT, false);
+ }
+
+ /**
+ * Configure the bookie to listen for BookKeeper clients executed on the local JVM
+ *
+ * @see #getEnableLocalTransport
+ * @param enableLocalTransport
+ * whether to use listen for local JVM clients
+ * @return server configuration
+ */
+ public ServerConfiguration setEnableLocalTransport(boolean enableLocalTransport) {
+ setProperty(ENABLE_LOCAL_TRANSPORT, enableLocalTransport);
+ return this;
+ }
+
+
+ /**
* Get the stats provider used by bookie.
*
* @return stats provider class
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e5939ed5/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
index b0d0327..eb0f6f3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import static org.apache.bookkeeper.util.BookKeeperConstants.COLON;
+import org.jboss.netty.channel.local.LocalAddress;
/**
* This is a data wrapper class that is an InetSocketAddress, it would use the hostname
@@ -75,6 +76,13 @@ public class BookieSocketAddress {
return socketAddress;
}
+ /**
+ * Maps the socketAddress to a "local" address
+ */
+ public LocalAddress getLocalAddress() {
+ return new LocalAddress(socketAddress.toString());
+ }
+
// Return the String "serialized" version of this object.
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e5939ed5/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index bb1b207..202a5e5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -53,6 +53,9 @@ import org.slf4j.LoggerFactory;
import com.google.protobuf.ExtensionRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
+import org.jboss.netty.channel.local.LocalAddress;
/**
* Netty server for serving bookie requests
@@ -63,11 +66,13 @@ class BookieNettyServer {
final static int maxMessageSize = 0xfffff;
final ServerConfiguration conf;
final ChannelFactory serverChannelFactory;
+ final ChannelFactory jvmServerChannelFactory;
final RequestProcessor requestProcessor;
final ChannelGroup allChannels = new CleanupChannelGroup();
final AtomicBoolean isRunning = new AtomicBoolean(false);
Object suspensionLock = new Object();
boolean suspended = false;
+ final BookieSocketAddress bookieAddress;
final BookieAuthProvider.Factory authProviderFactory;
final BookieProtoEncoding.ResponseEncoder responseEncoder;
@@ -89,14 +94,20 @@ class BookieNettyServer {
serverChannelFactory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(tfb.setNameFormat(base + "-boss-%d").build()),
Executors.newCachedThreadPool(tfb.setNameFormat(base + "-worker-%d").build()));
+ if (conf.isEnableLocalTransport()) {
+ jvmServerChannelFactory = new DefaultLocalServerChannelFactory();
+ } else {
+ jvmServerChannelFactory = null;
+ }
+ bookieAddress = Bookie.getBookieAddress(conf);
InetSocketAddress bindAddress;
if (conf.getListeningInterface() == null) {
// listen on all interfaces
bindAddress = new InetSocketAddress(conf.getBookiePort());
} else {
- bindAddress = Bookie.getBookieAddress(conf).getSocketAddress();
+ bindAddress = bookieAddress.getSocketAddress();
}
- listenOn(bindAddress);
+ listenOn(bindAddress, bookieAddress);
}
boolean isRunning() {
@@ -120,7 +131,7 @@ class BookieNettyServer {
}
}
- private void listenOn(InetSocketAddress address) {
+ private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddress) {
ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
bootstrap.setPipelineFactory(new BookiePipelineFactory());
bootstrap.setOption("child.tcpNoDelay", conf.getServerTcpNoDelay());
@@ -128,6 +139,16 @@ class BookieNettyServer {
Channel listen = bootstrap.bind(address);
allChannels.add(listen);
+
+ if (conf.isEnableLocalTransport()) {
+ ServerBootstrap jvmbootstrap = new ServerBootstrap(jvmServerChannelFactory);
+ jvmbootstrap.setPipelineFactory(new BookiePipelineFactory());
+
+ // use the same address 'name', so clients can find local Bookie still discovering them using ZK
+ Channel jvmlisten = jvmbootstrap.bind(bookieAddress.getLocalAddress());
+ allChannels.add(jvmlisten);
+ LocalBookiesRegistry.registerLocalBookieAddress(bookieAddress);
+ }
}
void start() {
@@ -136,9 +157,15 @@ class BookieNettyServer {
void shutdown() {
LOG.info("Shutting down BookieNettyServer");
+ if (conf.isEnableLocalTransport()) {
+ LocalBookiesRegistry.unregisterLocalBookieAddress(bookieAddress);
+ }
isRunning.set(false);
allChannels.close().awaitUninterruptibly();
serverChannelFactory.releaseExternalResources();
+ if (conf.isEnableLocalTransport()) {
+ jvmServerChannelFactory.releaseExternalResources();
+ }
}
private class BookiePipelineFactory implements ChannelPipelineFactory {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e5939ed5/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
new file mode 100644
index 0000000..f123aa6
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+
+/**
+ * Local registry for embedded Bookies
+ */
+class LocalBookiesRegistry {
+
+ private final static ConcurrentHashMap<BookieSocketAddress,Boolean> localBookiesRegistry
+ = new ConcurrentHashMap<>();
+
+ static void registerLocalBookieAddress(BookieSocketAddress address) {
+ localBookiesRegistry.put(address,Boolean.TRUE);
+ }
+ static void unregisterLocalBookieAddress(BookieSocketAddress address) {
+ localBookiesRegistry.remove(address);
+ }
+ static boolean isLocalBookie(BookieSocketAddress address) {
+ return localBookiesRegistry.containsKey(address);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e5939ed5/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 0f9feea..62f55ea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -66,6 +66,9 @@ import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
+import org.jboss.netty.channel.local.LocalAddress;
+import org.jboss.netty.channel.local.LocalClientChannelFactory;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
@@ -81,6 +84,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;
+import java.net.SocketAddress;
+import org.jboss.netty.channel.ChannelFactory;
/**
* This class manages all details of connection to a particular bookie. It also
@@ -105,7 +110,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
public static final AtomicLong txnIdGenerator = new AtomicLong(0);
final BookieSocketAddress addr;
- final ClientSocketChannelFactory channelFactory;
+ final ChannelFactory channelFactory;
final OrderedSafeExecutor executor;
final HashedWheelTimer requestTimer;
final int addEntryTimeout;
@@ -161,7 +166,11 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
this.conf = conf;
this.addr = addr;
this.executor = executor;
- this.channelFactory = channelFactory;
+ if (LocalBookiesRegistry.isLocalBookie(addr)){
+ this.channelFactory = new DefaultLocalClientChannelFactory();
+ } else {
+ this.channelFactory = channelFactory;
+ }
this.state = ConnectionState.DISCONNECTED;
this.requestTimer = requestTimer;
this.addEntryTimeout = conf.getAddEntryTimeout();
@@ -212,8 +221,11 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
bootstrap.setOption("child.receiveBufferSize", conf.getClientReceiveBufferSize());
bootstrap.setOption("writeBufferLowWaterMark", conf.getClientWriteBufferLowWaterMark());
bootstrap.setOption("writeBufferHighWaterMark", conf.getClientWriteBufferHighWaterMark());
-
- ChannelFuture future = bootstrap.connect(addr.getSocketAddress());
+ SocketAddress bookieAddr = addr.getSocketAddress();
+ if (channelFactory instanceof LocalClientChannelFactory) {
+ bookieAddr = addr.getLocalAddress();
+ }
+ ChannelFuture future = bootstrap.connect(bookieAddr);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e5939ed5/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LocalBookKeeperTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LocalBookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LocalBookKeeperTest.java
new file mode 100644
index 0000000..4289476
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LocalBookKeeperTest.java
@@ -0,0 +1,67 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.bookkeeper.client;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BaseTestCase;
+import org.junit.Test;
+
+/**
+ * Tests of the main BookKeeper client using networkless comunication
+ */
+public class LocalBookKeeperTest extends BaseTestCase {
+
+ protected ServerConfiguration newServerConfiguration() throws Exception {
+ return super
+ .newServerConfiguration()
+ .setEnableLocalTransport(true);
+ }
+
+ DigestType digestType;
+
+ public LocalBookKeeperTest(DigestType digestType) {
+ super(4);
+ this.digestType=digestType;
+ }
+
+ @Test
+ public void testUseLocalBookie() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration()
+ .setZkServers(zkUtil.getZooKeeperConnectString())
+ .setZkTimeout(20000);
+
+ CountDownLatch l = new CountDownLatch(1);
+ zkUtil.sleepServer(5, l);
+ l.await();
+
+ BookKeeper bkc = new BookKeeper(conf);
+ LedgerHandle h = bkc.createLedger(1,1,digestType, "testPasswd".getBytes());
+ h.addEntry("test".getBytes());
+ h.close();
+ bkc.close();
+ }
+
+}