You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/08/10 15:41:13 UTC

[GitHub] sijie commented on a change in pull request #1595: Make bookie client an interface

sijie commented on a change in pull request #1595: Make bookie client an interface
URL: https://github.com/apache/bookkeeper/pull/1595#discussion_r209301821
 
 

 ##########
 File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
 ##########
 @@ -20,579 +20,60 @@
  */
 package org.apache.bookkeeper.proto;
 
-import static com.google.common.base.Charsets.UTF_8;
-import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.ExtensionRegistry;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
-import org.apache.bookkeeper.auth.ClientAuthProvider;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.api.WriteFlag;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.common.util.SafeRunnable;
-import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.tls.SecurityException;
-import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Implements the client-side part of the BookKeeper protocol.
- *
+ * Low level client for talking to bookies.
  */
-public class BookieClient implements PerChannelBookieClientFactory {
-    static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
-
-    // This is global state that should be across all BookieClients
-    AtomicLong totalBytesOutstanding = new AtomicLong();
-
-    OrderedExecutor executor;
-    ScheduledExecutorService scheduler;
-    ScheduledFuture<?> timeoutFuture;
-
-    EventLoopGroup eventLoopGroup;
-    final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
-            new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
-
-    private final ClientAuthProvider.Factory authProviderFactory;
-    private final ExtensionRegistry registry;
-
-    private final ClientConfiguration conf;
-    private volatile boolean closed;
-    private final ReentrantReadWriteLock closeLock;
-    private final StatsLogger statsLogger;
-    private final int numConnectionsPerBookie;
-
-    private final long bookieErrorThresholdPerInterval;
-
-    public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
-                        OrderedExecutor executor, ScheduledExecutorService scheduler,
-                        StatsLogger statsLogger) throws IOException {
-        this.conf = conf;
-        this.eventLoopGroup = eventLoopGroup;
-        this.executor = executor;
-        this.closed = false;
-        this.closeLock = new ReentrantReadWriteLock();
-
-        this.registry = ExtensionRegistry.newInstance();
-        this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf);
-
-        this.statsLogger = statsLogger;
-        this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
-        this.bookieErrorThresholdPerInterval = conf.getBookieErrorThresholdPerInterval();
-
-        this.scheduler = scheduler;
-        if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
-            SafeRunnable monitor = safeRun(() -> {
-                monitorPendingOperations();
-            });
-            this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
-                                                                    conf.getTimeoutMonitorIntervalSec(),
-                                                                    conf.getTimeoutMonitorIntervalSec(),
-                                                                    TimeUnit.SECONDS);
-        }
-    }
-
-    private int getRc(int rc) {
-        if (BKException.Code.OK == rc) {
-            return rc;
-        } else {
-            if (closed) {
-                return BKException.Code.ClientClosedException;
-            } else {
-                return rc;
-            }
-        }
-    }
-
-    public List<BookieSocketAddress> getFaultyBookies() {
-        List<BookieSocketAddress> faultyBookies = Lists.newArrayList();
-        for (PerChannelBookieClientPool channelPool : channels.values()) {
-            if (channelPool instanceof DefaultPerChannelBookieClientPool) {
-                DefaultPerChannelBookieClientPool pool = (DefaultPerChannelBookieClientPool) channelPool;
-                if (pool.errorCounter.getAndSet(0) >= bookieErrorThresholdPerInterval) {
-                    faultyBookies.add(pool.address);
-                }
-            }
-        }
-        return faultyBookies;
-    }
-
-    public boolean isWritable(BookieSocketAddress address, long key) {
-        final PerChannelBookieClientPool pcbcPool = lookupClient(address);
-        // if null, let the write initiate connect of fail with whatever error it produces
-        return pcbcPool == null || pcbcPool.isWritable(key);
-    }
-
-    @Override
-    public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool,
-            SecurityHandlerFactory shFactory) throws SecurityException {
-        return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, statsLogger,
-                                          authProviderFactory, registry, pcbcPool, shFactory);
-    }
-
-    public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
-        PerChannelBookieClientPool clientPool = channels.get(addr);
-        if (null == clientPool) {
-            closeLock.readLock().lock();
-            try {
-                if (closed) {
-                    return null;
-                }
-                PerChannelBookieClientPool newClientPool =
-                    new DefaultPerChannelBookieClientPool(conf, this, addr, numConnectionsPerBookie);
-                PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool);
-                if (null == oldClientPool) {
-                    clientPool = newClientPool;
-                    // initialize the pool only after we put the pool into the map
-                    clientPool.intialize();
-                } else {
-                    clientPool = oldClientPool;
-                    newClientPool.close(false);
-                }
-            } catch (SecurityException e) {
-                LOG.error("Security Exception in creating new default PCBC pool: ", e);
-                return null;
-            } finally {
-                closeLock.readLock().unlock();
-            }
-        }
-        return clientPool;
-    }
-
-    public void forceLedger(final BookieSocketAddress addr, final long ledgerId,
-            final ForceLedgerCallback cb, final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.forceLedgerComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
-                              ledgerId, addr, ctx);
-            return;
-        }
-
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                try {
-                    executor.executeOrdered(ledgerId, safeRun(() -> {
-                        cb.forceLedgerComplete(rc, ledgerId, addr, ctx);
-                    }));
-                } catch (RejectedExecutionException re) {
-                    cb.forceLedgerComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
-                }
-            } else {
-                pcbc.forceLedger(ledgerId, cb, ctx);
-            }
-        }, ledgerId);
-    }
-
-    public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
-            final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
-                              ledgerId, addr, ctx);
-            return;
-        }
-
-        toSend.retain();
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                try {
-                    executor.executeOrdered(ledgerId, safeRun(() -> {
-                        cb.writeLacComplete(rc, ledgerId, addr, ctx);
-                    }));
-                } catch (RejectedExecutionException re) {
-                    cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
-                }
-            } else {
-                pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
-            }
-
-            toSend.release();
-        }, ledgerId);
-    }
-
-    private void completeAdd(final int rc,
-                             final long ledgerId,
-                             final long entryId,
-                             final BookieSocketAddress addr,
-                             final WriteCallback cb,
-                             final Object ctx) {
-        try {
-            executor.executeOrdered(ledgerId, new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
-                }
-                @Override
-                public String toString() {
-                    return String.format("CompleteWrite(ledgerId=%d, entryId=%d, addr=%s)", ledgerId, entryId, addr);
-                }
-            });
-        } catch (RejectedExecutionException ree) {
-            cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId, addr, ctx);
-        }
-    }
-
-    public void addEntry(final BookieSocketAddress addr,
-                         final long ledgerId,
-                         final byte[] masterKey,
-                         final long entryId,
-                         final ByteBufList toSend,
-                         final WriteCallback cb,
-                         final Object ctx,
-                         final int options,
-                         final boolean allowFastFail,
-                         final EnumSet<WriteFlag> writeFlags) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
-                        ledgerId, entryId, addr, cb, ctx);
-            return;
-        }
-
-        // Retain the buffer, since the connection could be obtained after
-        // the PendingApp might have already failed
-        toSend.retain();
-
-        client.obtain(ChannelReadyForAddEntryCallback.create(
-                              this, toSend, ledgerId, entryId, addr,
-                                  ctx, cb, options, masterKey, allowFastFail, writeFlags),
-                      ledgerId);
-    }
-
-    private void completeRead(final int rc,
-                              final long ledgerId,
-                              final long entryId,
-                              final ByteBuf entry,
-                              final ReadEntryCallback cb,
-                              final Object ctx) {
-        try {
-            executor.executeOrdered(ledgerId, new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    cb.readEntryComplete(rc, ledgerId, entryId, entry, ctx);
-                }
-            });
-        } catch (RejectedExecutionException ree) {
-            cb.readEntryComplete(getRc(BKException.Code.InterruptedException),
-                                 ledgerId, entryId, entry, ctx);
-        }
-    }
-
-    private static class ChannelReadyForAddEntryCallback
-        implements GenericCallback<PerChannelBookieClient> {
-        private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
-
-        private BookieClient bookieClient;
-        private ByteBufList toSend;
-        private long ledgerId;
-        private long entryId;
-        private BookieSocketAddress addr;
-        private Object ctx;
-        private WriteCallback cb;
-        private int options;
-        private byte[] masterKey;
-        private boolean allowFastFail;
-        private EnumSet<WriteFlag> writeFlags;
-
-        static ChannelReadyForAddEntryCallback create(
-                BookieClient bookieClient, ByteBufList toSend, long ledgerId,
-                long entryId, BookieSocketAddress addr, Object ctx,
-                WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
-                EnumSet<WriteFlag> writeFlags) {
-            ChannelReadyForAddEntryCallback callback = RECYCLER.get();
-            callback.bookieClient = bookieClient;
-            callback.toSend = toSend;
-            callback.ledgerId = ledgerId;
-            callback.entryId = entryId;
-            callback.addr = addr;
-            callback.ctx = ctx;
-            callback.cb = cb;
-            callback.options = options;
-            callback.masterKey = masterKey;
-            callback.allowFastFail = allowFastFail;
-            callback.writeFlags = writeFlags;
-            return callback;
-        }
-
-        @Override
-        public void operationComplete(final int rc,
-                                      PerChannelBookieClient pcbc) {
-            if (rc != BKException.Code.OK) {
-                bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
-            } else {
-                pcbc.addEntry(ledgerId, masterKey, entryId,
-                              toSend, cb, ctx, options, allowFastFail, writeFlags);
-            }
-
-            toSend.release();
-            recycle();
-        }
-
-        private ChannelReadyForAddEntryCallback(
-                Handle<ChannelReadyForAddEntryCallback> recyclerHandle) {
-            this.recyclerHandle = recyclerHandle;
-        }
-
-        private static final Recycler<ChannelReadyForAddEntryCallback> RECYCLER =
-            new Recycler<ChannelReadyForAddEntryCallback>() {
-                    protected ChannelReadyForAddEntryCallback newObject(
-                            Recycler.Handle<ChannelReadyForAddEntryCallback> recyclerHandle) {
-                        return new ChannelReadyForAddEntryCallback(recyclerHandle);
-                    }
-                };
-
-        public void recycle() {
-            bookieClient = null;
-            toSend = null;
-            ledgerId = -1;
-            entryId = -1;
-            addr = null;
-            ctx = null;
-            cb = null;
-            options = -1;
-            masterKey = null;
-            allowFastFail = false;
-            writeFlags = null;
-            recyclerHandle.recycle(this);
-        }
-    }
-
-    public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb,
-            final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null,
-                    ctx);
-            return;
-        }
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                try {
-                    executor.executeOrdered(ledgerId, safeRun(() -> {
-                        cb.readLacComplete(rc, ledgerId, null, null, ctx);
-                    }));
-                } catch (RejectedExecutionException re) {
-                    cb.readLacComplete(getRc(BKException.Code.InterruptedException),
-                            ledgerId, null, null, ctx);
-                }
-            } else {
-                pcbc.readLac(ledgerId, cb, ctx);
-            }
-        }, ledgerId);
-    }
-
-    public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,
-                          ReadEntryCallback cb, Object ctx, int flags) {
+public interface BookieClient {
 
 Review comment:
   since we are abstracting the interface, it is a good time to add comments for individual methods. it is an internal contract between components.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services