You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2016/04/05 18:47:36 UTC

bookkeeper git commit: BOOKKEEPER-896: VM-local transport

Repository: bookkeeper
Updated Branches:
  refs/heads/master d607b366b -> e5939ed58


BOOKKEEPER-896: VM-local transport

Author: eolivelli <eo...@gmail.com>

Reviewers: Sijie Guo <si...@apache.org>, Matteo Merli <mm...@apache.org>

Closes #21 from eolivelli/BOOKKEEPER-896


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/e5939ed5
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/e5939ed5
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/e5939ed5

Branch: refs/heads/master
Commit: e5939ed587d81212fb032a37b2b83401359fa7fc
Parents: d607b36
Author: eolivelli <eo...@gmail.com>
Authored: Tue Apr 5 09:47:07 2016 -0700
Committer: Matteo Merli <mm...@apache.org>
Committed: Tue Apr 5 09:47:07 2016 -0700

----------------------------------------------------------------------
 .../bookkeeper/conf/ServerConfiguration.java    | 24 +++++++
 .../bookkeeper/net/BookieSocketAddress.java     |  8 +++
 .../bookkeeper/proto/BookieNettyServer.java     | 33 +++++++++-
 .../bookkeeper/proto/LocalBookiesRegistry.java  | 44 +++++++++++++
 .../proto/PerChannelBookieClient.java           | 20 ++++--
 .../bookkeeper/client/LocalBookKeeperTest.java  | 67 ++++++++++++++++++++
 6 files changed, 189 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e5939ed5/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index d770650..686104d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -110,6 +110,7 @@ public class ServerConfiguration extends AbstractConfiguration {
     // Whether the bookie should use its hostname or ipaddress for the
     // registration.
     protected final static String USE_HOST_NAME_AS_BOOKIE_ID = "useHostNameAsBookieID";
+    protected final static String ENABLE_LOCAL_TRANSPORT = "enableLocalTransport";
 
     protected final static String SORTED_LEDGER_STORAGE_ENABLED = "sortedLedgerStorageEnabled";
     protected final static String SKIP_LIST_SIZE_LIMIT = "skipListSizeLimit";
@@ -1525,6 +1526,29 @@ public class ServerConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get hwhether to use listen for local JVM clients. Defaults to false.
+     *
+     * @return true, then bookie will be listen for local JVM clients
+     */
+    public boolean isEnableLocalTransport() {
+        return getBoolean(ENABLE_LOCAL_TRANSPORT, false);
+    }
+
+    /**
+     * Configure the bookie to listen for BookKeeper clients executed on the local JVM
+     *
+     * @see #getEnableLocalTransport
+     * @param enableLocalTransport
+     *            whether to use listen for local JVM clients
+     * @return server configuration
+     */
+    public ServerConfiguration setEnableLocalTransport(boolean enableLocalTransport) {
+        setProperty(ENABLE_LOCAL_TRANSPORT, enableLocalTransport);
+        return this;
+    }
+
+
+    /**
      * Get the stats provider used by bookie.
      *
      * @return stats provider class

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e5939ed5/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
index b0d0327..eb0f6f3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 
 import static org.apache.bookkeeper.util.BookKeeperConstants.COLON;
+import org.jboss.netty.channel.local.LocalAddress;
 
 /**
  * This is a data wrapper class that is an InetSocketAddress, it would use the hostname
@@ -75,6 +76,13 @@ public class BookieSocketAddress {
         return socketAddress;
     }
 
+    /**
+     * Maps the socketAddress to a "local" address
+     */
+    public LocalAddress getLocalAddress() {
+        return new LocalAddress(socketAddress.toString());
+    }
+
     // Return the String "serialized" version of this object.
     @Override
     public String toString() {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e5939ed5/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index bb1b207..202a5e5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -53,6 +53,9 @@ import org.slf4j.LoggerFactory;
 import com.google.protobuf.ExtensionRegistry;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
+import org.jboss.netty.channel.local.LocalAddress;
 
 /**
  * Netty server for serving bookie requests
@@ -63,11 +66,13 @@ class BookieNettyServer {
     final static int maxMessageSize = 0xfffff;
     final ServerConfiguration conf;
     final ChannelFactory serverChannelFactory;
+    final ChannelFactory jvmServerChannelFactory;
     final RequestProcessor requestProcessor;
     final ChannelGroup allChannels = new CleanupChannelGroup();
     final AtomicBoolean isRunning = new AtomicBoolean(false);
     Object suspensionLock = new Object();
     boolean suspended = false;
+    final BookieSocketAddress bookieAddress;
 
     final BookieAuthProvider.Factory authProviderFactory;
     final BookieProtoEncoding.ResponseEncoder responseEncoder;
@@ -89,14 +94,20 @@ class BookieNettyServer {
         serverChannelFactory = new NioServerSocketChannelFactory(
                 Executors.newCachedThreadPool(tfb.setNameFormat(base + "-boss-%d").build()),
                 Executors.newCachedThreadPool(tfb.setNameFormat(base + "-worker-%d").build()));
+        if (conf.isEnableLocalTransport()) {
+            jvmServerChannelFactory = new DefaultLocalServerChannelFactory();
+        } else {
+            jvmServerChannelFactory = null;
+        }
+        bookieAddress = Bookie.getBookieAddress(conf);
         InetSocketAddress bindAddress;
         if (conf.getListeningInterface() == null) {
             // listen on all interfaces
             bindAddress = new InetSocketAddress(conf.getBookiePort());
         } else {
-            bindAddress = Bookie.getBookieAddress(conf).getSocketAddress();
+            bindAddress = bookieAddress.getSocketAddress();
         }
-        listenOn(bindAddress);
+        listenOn(bindAddress, bookieAddress);
     }
 
     boolean isRunning() {
@@ -120,7 +131,7 @@ class BookieNettyServer {
         }
     }
 
-    private void listenOn(InetSocketAddress address) {
+    private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddress) {
         ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
         bootstrap.setPipelineFactory(new BookiePipelineFactory());
         bootstrap.setOption("child.tcpNoDelay", conf.getServerTcpNoDelay());
@@ -128,6 +139,16 @@ class BookieNettyServer {
 
         Channel listen = bootstrap.bind(address);
         allChannels.add(listen);
+
+        if (conf.isEnableLocalTransport()) {
+            ServerBootstrap jvmbootstrap = new ServerBootstrap(jvmServerChannelFactory);
+            jvmbootstrap.setPipelineFactory(new BookiePipelineFactory());
+
+            // use the same address 'name', so clients can find local Bookie still discovering them using ZK
+            Channel jvmlisten = jvmbootstrap.bind(bookieAddress.getLocalAddress());
+            allChannels.add(jvmlisten);
+            LocalBookiesRegistry.registerLocalBookieAddress(bookieAddress);
+        }
     }
 
     void start() {
@@ -136,9 +157,15 @@ class BookieNettyServer {
 
     void shutdown() {
         LOG.info("Shutting down BookieNettyServer");
+        if (conf.isEnableLocalTransport()) {
+            LocalBookiesRegistry.unregisterLocalBookieAddress(bookieAddress);
+        }
         isRunning.set(false);
         allChannels.close().awaitUninterruptibly();
         serverChannelFactory.releaseExternalResources();
+        if (conf.isEnableLocalTransport()) {
+            jvmServerChannelFactory.releaseExternalResources();
+        }
     }
 
     private class BookiePipelineFactory implements ChannelPipelineFactory {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e5939ed5/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
new file mode 100644
index 0000000..f123aa6
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+
+/**
+ * Local registry for embedded Bookies
+ */
+class LocalBookiesRegistry {
+    
+    private final static ConcurrentHashMap<BookieSocketAddress,Boolean> localBookiesRegistry
+            = new ConcurrentHashMap<>();
+    
+    static void registerLocalBookieAddress(BookieSocketAddress address) {        
+        localBookiesRegistry.put(address,Boolean.TRUE);
+    }
+    static void unregisterLocalBookieAddress(BookieSocketAddress address) {
+        localBookiesRegistry.remove(address);
+    }
+    static boolean isLocalBookie(BookieSocketAddress address) {        
+        return localBookiesRegistry.containsKey(address);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e5939ed5/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 0f9feea..62f55ea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -66,6 +66,9 @@ import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
+import org.jboss.netty.channel.local.LocalAddress;
+import org.jboss.netty.channel.local.LocalClientChannelFactory;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
@@ -81,6 +84,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
+import java.net.SocketAddress;
+import org.jboss.netty.channel.ChannelFactory;
 
 /**
  * This class manages all details of connection to a particular bookie. It also
@@ -105,7 +110,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     public static final AtomicLong txnIdGenerator = new AtomicLong(0);
 
     final BookieSocketAddress addr;
-    final ClientSocketChannelFactory channelFactory;
+    final ChannelFactory channelFactory;
     final OrderedSafeExecutor executor;
     final HashedWheelTimer requestTimer;
     final int addEntryTimeout;
@@ -161,7 +166,11 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         this.conf = conf;
         this.addr = addr;
         this.executor = executor;
-        this.channelFactory = channelFactory;
+        if (LocalBookiesRegistry.isLocalBookie(addr)){
+            this.channelFactory = new DefaultLocalClientChannelFactory();
+        } else {
+            this.channelFactory = channelFactory;
+        }
         this.state = ConnectionState.DISCONNECTED;
         this.requestTimer = requestTimer;
         this.addEntryTimeout = conf.getAddEntryTimeout();
@@ -212,8 +221,11 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         bootstrap.setOption("child.receiveBufferSize", conf.getClientReceiveBufferSize());
         bootstrap.setOption("writeBufferLowWaterMark", conf.getClientWriteBufferLowWaterMark());
         bootstrap.setOption("writeBufferHighWaterMark", conf.getClientWriteBufferHighWaterMark());
-
-        ChannelFuture future = bootstrap.connect(addr.getSocketAddress());
+        SocketAddress bookieAddr = addr.getSocketAddress();        
+        if (channelFactory instanceof LocalClientChannelFactory) {
+            bookieAddr = addr.getLocalAddress();
+        }
+        ChannelFuture future = bootstrap.connect(bookieAddr);
         future.addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future) throws Exception {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e5939ed5/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LocalBookKeeperTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LocalBookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LocalBookKeeperTest.java
new file mode 100644
index 0000000..4289476
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LocalBookKeeperTest.java
@@ -0,0 +1,67 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.bookkeeper.client;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BaseTestCase;
+import org.junit.Test;
+
+/**
+ * Tests of the main BookKeeper client using networkless comunication
+ */
+public class LocalBookKeeperTest extends BaseTestCase {
+    
+    protected ServerConfiguration newServerConfiguration() throws Exception {       
+        return super
+                .newServerConfiguration()
+                .setEnableLocalTransport(true);
+    }
+        
+    DigestType digestType;
+    
+    public LocalBookKeeperTest(DigestType digestType) {
+        super(4);            
+        this.digestType=digestType;
+    }
+
+    @Test
+    public void testUseLocalBookie() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(20000);
+
+        CountDownLatch l = new CountDownLatch(1);
+        zkUtil.sleepServer(5, l);
+        l.await();
+                
+        BookKeeper bkc = new BookKeeper(conf);
+        LedgerHandle h = bkc.createLedger(1,1,digestType, "testPasswd".getBytes());
+        h.addEntry("test".getBytes());
+        h.close();
+        bkc.close();
+    }
+
+}