You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:27 UTC
[17/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java
deleted file mode 100644
index 1b6a4a3..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.benchmark;
-
-import java.util.Random;
-import java.util.concurrent.Semaphore;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.util.MathUtils;
-import static com.google.common.base.Charsets.UTF_8;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BookkeeperBenchmark extends AbstractBenchmark {
-
- static final Logger logger = LoggerFactory.getLogger(BookkeeperBenchmark.class);
-
- BookKeeper bk;
- LedgerHandle[] lh;
-
- public BookkeeperBenchmark(String zkHostPort) throws Exception {
- bk = new BookKeeper(zkHostPort);
- int numLedgers = Integer.getInteger("nLedgers",5);
- lh = new LedgerHandle[numLedgers];
- int quorumSize = Integer.getInteger("quorum", 2);
- int ensembleSize = Integer.getInteger("ensemble", 4);
- DigestType digestType = DigestType.valueOf(System.getProperty("digestType", "CRC32"));
- for (int i=0; i< numLedgers; i++) {
- lh[i] = bk.createLedger(ensembleSize, quorumSize, digestType, "blah".getBytes(UTF_8));
- }
-
- }
-
-
- @Override
- void doOps(final int numOps) throws Exception {
- int size = Integer.getInteger("size", 1024);
- byte[] msg = new byte[size];
-
- int numOutstanding = Integer.getInteger("nPars",1000);
- final Semaphore outstanding = new Semaphore(numOutstanding);
-
- AddCallback callback = new AddCallback() {
- AbstractCallback handler = new AbstractCallback(outstanding, numOps);
-
-
- @Override
- public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
- handler.handle(rc == BKException.Code.OK, ctx);
- }
-
- };
-
-
-
- Random rand = new Random();
-
- for (int i=0; i<numOps; i++) {
- outstanding.acquire();
- lh[rand.nextInt(lh.length)].asyncAddEntry(msg, callback, MathUtils.now());
- }
-
-
- }
-
- @Override
- public void tearDown() throws Exception {
- bk.close();
- }
-
-
- public static void main(String[] args) throws Exception {
- BookkeeperBenchmark benchmark = new BookkeeperBenchmark(args[0]);
- benchmark.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java
deleted file mode 100644
index ba81834..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.benchmark;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelHandler.Sharable;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.logging.InternalLoggerFactory;
-import org.jboss.netty.logging.Log4JLoggerFactory;
-
-@Sharable
-public class FakeBookie extends SimpleChannelHandler implements
- ChannelPipelineFactory {
- private static final Logger logger = LoggerFactory.getLogger(FakeBookie.class);
- ServerSocketChannelFactory serverChannelFactory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-
- public FakeBookie(int port) {
- InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
- ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
-
- bootstrap.setPipelineFactory(this);
- bootstrap.setOption("child.tcpNoDelay", true);
- bootstrap.setOption("child.keepAlive", true);
- bootstrap.setOption("reuseAddress", true);
-
- logger.info("Going into receive loop");
- // Bind and start to accept incoming connections.
- bootstrap.bind(new InetSocketAddress(port));
- }
-
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- pipeline.addLast("lengthbaseddecoder",
- new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 4, 0, 4));
- pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
- pipeline.addLast("main", this);
- return pipeline;
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- if (!(e.getMessage() instanceof ChannelBuffer)) {
- ctx.sendUpstream(e);
- return;
- }
-
- ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
-
- int type = buffer.readInt();
- buffer.readerIndex(24);
- long ledgerId = buffer.readLong();
- long entryId = buffer.readLong();
-
- ChannelBuffer outBuf = ctx.getChannel().getConfig().getBufferFactory()
- .getBuffer(24);
- outBuf.writeInt(type);
- outBuf.writeInt(0); // rc
- outBuf.writeLong(ledgerId);
- outBuf.writeLong(entryId);
- e.getChannel().write(outBuf);
-
- }
-
-
- public static void main(String args[]) {
- new FakeBookie(Integer.parseInt(args[0]));
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java b/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java
deleted file mode 100644
index 2611bc0..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.common;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import com.google.protobuf.ByteString;
-
-public class ByteStringInterner {
- // TODO: how to release references when strings are no longer used. weak
- // references?
-
- private static final ConcurrentMap<ByteString, ByteString> map = new ConcurrentHashMap<ByteString, ByteString>();
-
- public static ByteString intern(ByteString in) {
- ByteString presentValueInMap = map.putIfAbsent(in, in);
- if (presentValueInMap != null) {
- return presentValueInMap;
- }
- return in;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java b/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
deleted file mode 100644
index 237c7de..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
+++ /dev/null
@@ -1,666 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.common;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hedwig.conf.AbstractConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.topics.HubLoad;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import com.google.protobuf.ByteString;
-
-public class ServerConfiguration extends AbstractConfiguration {
- public final static String REGION = "region";
- protected final static String MAX_MESSAGE_SIZE = "max_message_size";
- protected final static String READAHEAD_COUNT = "readahead_count";
- protected final static String READAHEAD_SIZE = "readahead_size";
- protected final static String CACHE_SIZE = "cache_size";
- protected final static String CACHE_ENTRY_TTL = "cache_entry_ttl";
- protected final static String SCAN_BACKOFF_MSEC = "scan_backoff_ms";
- protected final static String SERVER_PORT = "server_port";
- protected final static String SSL_SERVER_PORT = "ssl_server_port";
- protected final static String ZK_PREFIX = "zk_prefix";
- protected final static String ZK_HOST = "zk_host";
- protected final static String ZK_TIMEOUT = "zk_timeout";
- protected final static String READAHEAD_ENABLED = "readahead_enabled";
- protected final static String STANDALONE = "standalone";
- protected final static String REGIONS = "regions";
- protected final static String CERT_NAME = "cert_name";
- protected final static String CERT_PATH = "cert_path";
- protected final static String PASSWORD = "password";
- protected final static String SSL_ENABLED = "ssl_enabled";
- protected final static String CONSUME_INTERVAL = "consume_interval";
- protected final static String INIT_NUM_TOPICS = "init_num_topics";
- protected final static String MAX_NUM_TOPICS = "max_num_topics";
- protected final static String RETENTION_SECS = "retention_secs";
- protected final static String RETENTION_SECS_AFTER_ACCESS = "retention_secs_after_access";
- protected final static String INTER_REGION_SSL_ENABLED = "inter_region_ssl_enabled";
- protected final static String MESSAGES_CONSUMED_THREAD_RUN_INTERVAL = "messages_consumed_thread_run_interval";
- protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
- @Deprecated
- protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
- protected final static String BK_WRITE_QUORUM_SIZE = "bk_write_quorum_size";
- protected final static String BK_ACK_QUORUM_SIZE = "bk_ack_quorum_size";
- protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL = "retry_remote_subscribe_thread_run_interval";
- protected final static String DEFAULT_MESSAGE_WINDOW_SIZE =
- "default_message_window_size";
- protected final static String NUM_READAHEAD_CACHE_THREADS = "num_readahead_cache_threads";
- protected final static String NUM_DELIVERY_THREADS = "num_delivery_threads";
-
- protected final static String MAX_ENTRIES_PER_LEDGER = "max_entries_per_ledger";
- protected final static String REBALANCE_TOLERANCE_PERCENTAGE = "rebalance_tolerance";
- protected final static String REBALANCE_MAX_SHED = "rebalance_max_shed";
- protected final static String REBALANCE_INTERVAL_SEC = "rebalance_interval_sec";
-
- // manager related settings
- protected final static String METADATA_MANAGER_BASED_TOPIC_MANAGER_ENABLED = "metadata_manager_based_topic_manager_enabled";
- protected final static String METADATA_MANAGER_FACTORY_CLASS = "metadata_manager_factory_class";
-
- // metastore settings, only being used when METADATA_MANAGER_FACTORY_CLASS is MsMetadataManagerFactory
- protected final static String METASTORE_IMPL_CLASS = "metastore_impl_class";
- protected final static String METASTORE_MAX_ENTRIES_PER_SCAN = "metastoreMaxEntriesPerScan";
-
- private static ClassLoader defaultLoader;
- static {
- defaultLoader = Thread.currentThread().getContextClassLoader();
- if (null == defaultLoader) {
- defaultLoader = ServerConfiguration.class.getClassLoader();
- }
- }
-
- // these are the derived attributes
- protected ByteString myRegionByteString = null;
- protected HedwigSocketAddress myServerAddress = null;
- protected List<String> regionList = null;
-
- // Although this method is not currently used, currently maintaining it like
- // this so that we can support on-the-fly changes in configuration
- protected void refreshDerivedAttributes() {
- refreshMyRegionByteString();
- refreshMyServerAddress();
- refreshRegionList();
- }
-
- @Override
- public void loadConf(URL confURL) throws ConfigurationException {
- super.loadConf(confURL);
- refreshDerivedAttributes();
- }
-
- public int getMaximumMessageSize() {
- return conf.getInt(MAX_MESSAGE_SIZE, 1258291); /* 1.2M */
- }
-
- public String getMyRegion() {
- return conf.getString(REGION, "standalone");
- }
-
- protected void refreshMyRegionByteString() {
- myRegionByteString = ByteString.copyFromUtf8(getMyRegion());
- }
-
- protected void refreshMyServerAddress() {
- try {
- // Use the raw IP address as the hostname
- myServerAddress = new HedwigSocketAddress(InetAddress.getLocalHost().getHostAddress(), getServerPort(),
- getSSLServerPort());
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
- }
-
- // The expected format for the regions parameter is Hostname:Port:SSLPort
- // with spaces in between each of the regions.
- protected void refreshRegionList() {
- String regions = conf.getString(REGIONS, "");
- if (regions.isEmpty()) {
- regionList = new LinkedList<String>();
- } else {
- regionList = Arrays.asList(regions.split(" "));
- }
- }
-
- public ByteString getMyRegionByteString() {
- if (myRegionByteString == null) {
- refreshMyRegionByteString();
- }
- return myRegionByteString;
- }
-
- /**
- * Maximum number of messages to read ahead. Default is 10.
- *
- * @return int
- */
- public int getReadAheadCount() {
- return conf.getInt(READAHEAD_COUNT, 10);
- }
-
- /**
- * Maximum number of bytes to read ahead. Default is 4MB.
- *
- * @return long
- */
- public long getReadAheadSizeBytes() {
- return conf.getLong(READAHEAD_SIZE, 4 * 1024 * 1024); // 4M
- }
-
- /**
- * Maximum cache size. By default is the smallest of 2G or
- * half the heap size.
- *
- * @return long
- */
- public long getMaximumCacheSize() {
- // 2G or half of the maximum amount of memory the JVM uses
- return conf.getLong(CACHE_SIZE, Math.min(2 * 1024L * 1024L * 1024L, Runtime.getRuntime().maxMemory() / 2));
- }
-
- /**
- * Cache Entry TTL. By default is 0, cache entry will not be evicted
- * until the cache is fullfilled or the messages are already consumed.
- * The TTL is only checked when trying adding a new entry into the cache.
- *
- * @return cache entry ttl.
- */
- public long getCacheEntryTTL() {
- return conf.getLong(CACHE_ENTRY_TTL, 0L);
- }
-
- /**
- * After a scan of a log fails, how long before we retry (in msec)
- *
- * @return long
- */
- public long getScanBackoffPeriodMs() {
- return conf.getLong(SCAN_BACKOFF_MSEC, 1000);
- }
-
- /**
- * Returns server port.
- *
- * @return int
- */
- public int getServerPort() {
- return conf.getInt(SERVER_PORT, 4080);
- }
-
- /**
- * Returns SSL server port.
- *
- * @return int
- */
- public int getSSLServerPort() {
- return conf.getInt(SSL_SERVER_PORT, 9876);
- }
-
- /**
- * Returns ZooKeeper path prefix.
- *
- * @return string
- */
- public String getZkPrefix() {
- return conf.getString(ZK_PREFIX, "/hedwig");
- }
-
- public StringBuilder getZkRegionPrefix(StringBuilder sb) {
- return sb.append(getZkPrefix()).append("/").append(getMyRegion());
- }
-
- /**
- * Get znode path to store manager layouts.
- *
- * @param sb
- * StringBuilder to store znode path to store manager layouts.
- * @return znode path to store manager layouts.
- */
- public StringBuilder getZkManagersPrefix(StringBuilder sb) {
- return getZkRegionPrefix(sb).append("/managers");
- }
-
- public StringBuilder getZkTopicsPrefix(StringBuilder sb) {
- return getZkRegionPrefix(sb).append("/topics");
- }
-
- public StringBuilder getZkTopicPath(StringBuilder sb, ByteString topic) {
- return getZkTopicsPrefix(sb).append("/").append(topic.toStringUtf8());
- }
-
- public StringBuilder getZkHostsPrefix(StringBuilder sb) {
- return getZkRegionPrefix(sb).append("/hosts");
- }
-
- public HedwigSocketAddress getServerAddr() {
- if (myServerAddress == null) {
- refreshMyServerAddress();
- }
- return myServerAddress;
- }
-
- /**
- * Return ZooKeeper list of servers. Default is localhost.
- *
- * @return String
- */
- public String getZkHost() {
- List servers = conf.getList(ZK_HOST, null);
- if (null == servers || 0 == servers.size()) {
- return "localhost";
- }
- return StringUtils.join(servers, ",");
- }
-
- /**
- * Return ZooKeeper session timeout. Default is 2s.
- *
- * @return int
- */
- public int getZkTimeout() {
- return conf.getInt(ZK_TIMEOUT, 2000);
- }
-
- /**
- * Returns true if read-ahead enabled. Default is true.
- *
- * @return boolean
- */
- public boolean getReadAheadEnabled() {
- return conf.getBoolean(READAHEAD_ENABLED, true)
- || conf.getBoolean("readhead_enabled");
- // the key was misspelt in a previous version, so compensate here
- }
-
- /**
- * Returns true if standalone. Default is false.
- *
- * @return boolean
- */
- public boolean isStandalone() {
- return conf.getBoolean(STANDALONE, false);
- }
-
- /**
- * Returns list of regions.
- *
- * @return List<String>
- */
- public List<String> getRegions() {
- if (regionList == null) {
- refreshRegionList();
- }
- return regionList;
- }
-
- /**
- * Returns the name of the SSL certificate if available as a resource.
- *
- * @return String
- */
- public String getCertName() {
- return conf.getString(CERT_NAME, "");
- }
-
- /**
- * This is the path to the SSL certificate if it is available as a file.
- *
- * @return String
- */
- public String getCertPath() {
- return conf.getString(CERT_PATH, "");
- }
-
- // This method return the SSL certificate as an InputStream based on if it
- // is configured to be available as a resource or as a file. If nothing is
- // configured correctly, then a ConfigurationException will be thrown as
- // we do not know how to obtain the SSL certificate stream.
- public InputStream getCertStream() throws FileNotFoundException, ConfigurationException {
- String certName = getCertName();
- String certPath = getCertPath();
- if (certName != null && !certName.isEmpty()) {
- return getClass().getResourceAsStream(certName);
- } else if (certPath != null && !certPath.isEmpty()) {
- return new FileInputStream(certPath);
- } else
- throw new ConfigurationException("SSL Certificate configuration does not have resource name or path set!");
- }
-
- /**
- * Returns the password used for BookKeeper ledgers. Default
- * is the empty string.
- *
- * @return
- */
- public String getPassword() {
- return conf.getString(PASSWORD, "");
- }
-
- /**
- * Returns true if SSL is enabled. Default is false.
- *
- * @return boolean
- */
- public boolean isSSLEnabled() {
- return conf.getBoolean(SSL_ENABLED, false);
- }
-
- /**
- * Gets the number of messages consumed before persisting
- * information about consumed messages. A value greater than
- * one avoids persisting information about consumed messages
- * upon every consumed message. Default is 50.
- *
- * @return int
- */
- public int getConsumeInterval() {
- return conf.getInt(CONSUME_INTERVAL, 50);
- }
-
- /**
- * Returns the interval to release a topic. If this
- * parameter is greater than zero, then schedule a
- * task to release an owned topic. Default is 0 (never released).
- *
- * @return int
- */
- public int getRetentionSecs() {
- return conf.getInt(RETENTION_SECS, 0);
- }
-
- /**
- * Specifies that the topic should be automatically released
- * once a fixed duration after the topic is owned, a message is
- * published, or a message is delivered.
- *
- * @return the length of time after an entry is last accessed that
- * it should be automatically removed.
- */
- public int getRetentionSecsAfterAccess() {
- return conf.getInt(RETENTION_SECS_AFTER_ACCESS, 0);
- }
-
- /**
- * Max number of topics for a hub server to serve.
- *
- * @return max number of topics for a hub server to serve.
- */
- public int getMaxNumTopics() {
- return conf.getInt(MAX_NUM_TOPICS, Integer.MAX_VALUE);
- }
-
- /**
- * Minimum size of internal structure to store topics.
- *
- * @return init number of topics for a hub server.
- */
- public int getInitNumTopics() {
- return conf.getInt(INIT_NUM_TOPICS, 128);
- }
-
- /**
- * True if SSL is enabled across regions.
- *
- * @return boolean
- */
- public boolean isInterRegionSSLEnabled() {
- return conf.getBoolean(INTER_REGION_SSL_ENABLED, false);
- }
-
- /**
- * This parameter is used to determine how often we run the
- * SubscriptionManager's Messages Consumed timer task thread
- * (in milliseconds).
- *
- * @return int
- */
- public int getMessagesConsumedThreadRunInterval() {
- return conf.getInt(MESSAGES_CONSUMED_THREAD_RUN_INTERVAL, 60000);
- }
-
- /**
- * This parameter is used to determine how often we run a thread
- * to retry those failed remote subscriptions in asynchronous mode
- * (in milliseconds).
- *
- * @return int
- */
- public int getRetryRemoteSubscribeThreadRunInterval() {
- return conf.getInt(RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL, 120000);
- }
-
- /**
- * This parameter is for setting the default maximum number of messages which
- * can be delivered to a subscriber without being consumed.
- * we pause messages delivery to a subscriber when reaching the window size
- *
- * @return int
- */
- public int getDefaultMessageWindowSize() {
- return conf.getInt(DEFAULT_MESSAGE_WINDOW_SIZE, 0);
- }
-
- /**
- * This parameter is used when Bookkeeper is the persistence
- * store and indicates what the ensemble size is (i.e. how
- * many bookie servers to stripe the ledger entries across).
- *
- * @return int
- */
- public int getBkEnsembleSize() {
- return conf.getInt(BK_ENSEMBLE_SIZE, 3);
- }
-
-
- /**
- * This parameter is used when Bookkeeper is the persistence store
- * and indicates what the quorum size is (i.e. how many redundant
- * copies of each ledger entry is written).
- *
- * @return int
- * @deprecated please use #getBkWriteQuorumSize() and #getBkAckQuorumSize()
- */
- @Deprecated
- protected int getBkQuorumSize() {
- return conf.getInt(BK_QUORUM_SIZE, 2);
- }
-
- /**
- * Get the write quorum size for BookKeeper client, which is used to
- * indicate how many redundant copies of each ledger entry is written.
- *
- * @return write quorum size for BookKeeper client.
- */
- public int getBkWriteQuorumSize() {
- if (conf.containsKey(BK_WRITE_QUORUM_SIZE)) {
- return conf.getInt(BK_WRITE_QUORUM_SIZE, 2);
- } else {
- return getBkQuorumSize();
- }
- }
-
- /**
- * Get the ack quorum size for BookKeeper client.
- *
- * @return ack quorum size for BookKeeper client.
- */
- public int getBkAckQuorumSize() {
- if (conf.containsKey(BK_ACK_QUORUM_SIZE)) {
- return conf.getInt(BK_ACK_QUORUM_SIZE, 2);
- } else {
- return getBkQuorumSize();
- }
- }
-
- /**
- * This parameter is used when BookKeeper is the persistence storage,
- * and indicates when the number of entries stored in a ledger reach
- * the threshold, hub server will open a new ledger to write.
- *
- * @return max entries per ledger
- */
- public long getMaxEntriesPerLedger() {
- return conf.getLong(MAX_ENTRIES_PER_LEDGER, 0L);
- }
-
- /**
- * Get the tolerance percentage for the rebalancer. The rebalancer will not
- * shed load if it's current load is less than average + average*tolerancePercentage/100.0
- *
- * @return the tolerance percentage for the rebalancer.
- */
- public double getRebalanceTolerance() {
- return conf.getDouble(REBALANCE_TOLERANCE_PERCENTAGE, 10.0);
- }
-
- /**
- * Get the maximum load the rebalancer can shed at once. Default is 50.
- * @return
- */
- public HubLoad getRebalanceMaxShed() {
- return new HubLoad(conf.getLong(REBALANCE_MAX_SHED, 50));
- }
-
- /**
- * Get the interval(in seconds) between rebalancing attempts. The default is
- * 5 minutes.
- * @return
- */
- public long getRebalanceInterval() {
- return conf.getLong(REBALANCE_INTERVAL_SEC, 300);
- }
-
- /*
- * Is this a valid configuration that we can run with? This code might grow
- * over time.
- */
- public void validate() throws ConfigurationException {
- if (!getZkPrefix().startsWith("/")) {
- throw new ConfigurationException(ZK_PREFIX + " must start with a /");
- }
- // Validate that if Regions exist and inter-region communication is SSL
- // enabled, that the Regions correspond to valid HedwigSocketAddresses,
- // namely that SSL ports are present.
- if (isInterRegionSSLEnabled() && getRegions().size() > 0) {
- for (String hubString : getRegions()) {
- HedwigSocketAddress hub = new HedwigSocketAddress(hubString);
- if (hub.getSSLSocketAddress() == null)
- throw new ConfigurationException("Region defined does not have required SSL port: " + hubString);
- }
- }
- // Validate that the Bookkeeper ensemble size >= quorum size.
- if (getBkEnsembleSize() < getBkWriteQuorumSize()) {
- throw new ConfigurationException("BK ensemble size (" + getBkEnsembleSize()
- + ") is less than the write quorum size (" + getBkWriteQuorumSize() + ")");
- }
-
- if (getBkWriteQuorumSize() < getBkAckQuorumSize()) {
- throw new ConfigurationException("BK write quorum size (" + getBkWriteQuorumSize()
- + ") is less than the ack quorum size (" + getBkAckQuorumSize() + ")");
- }
- // Validate that the rebalance tolerance percentage is not negative.
- if (getRebalanceTolerance() < 0.0) {
- throw new ConfigurationException("The rebalance tolerance percentage cannot be negative.");
- }
- // Validate that the maximum load to shed during a rebalance is not negative.
- if (getRebalanceMaxShed().getNumTopics() < 0L) {
- throw new ConfigurationException("The maximum load to shed during a rebalance cannot be negative.");
- }
- // add other checks here
- }
-
- /**
- * Get number of read ahead cache threads.
- *
- * @return number of read ahead cache threads.
- */
- public int getNumReadAheadCacheThreads() {
- return conf.getInt(NUM_READAHEAD_CACHE_THREADS, Runtime.getRuntime().availableProcessors());
- }
-
- /**
- * Get number of delivery threads
- *
- * @return number of delivery threads.
- */
- public int getNumDeliveryThreads() {
- return conf.getInt(NUM_DELIVERY_THREADS, Runtime.getRuntime().availableProcessors());
- }
-
- /**
- * Whether enable metadata manager based topic manager.
- *
- * @return true if enabled metadata manager based topic manager.
- */
- public boolean isMetadataManagerBasedTopicManagerEnabled() {
- return conf.getBoolean(METADATA_MANAGER_BASED_TOPIC_MANAGER_ENABLED, false);
- }
-
- /**
- * Get metadata manager factory class.
- *
- * @return manager class
- */
- public Class<? extends MetadataManagerFactory> getMetadataManagerFactoryClass()
- throws ConfigurationException {
- return ReflectionUtils.getClass(conf, METADATA_MANAGER_FACTORY_CLASS,
- null, MetadataManagerFactory.class,
- defaultLoader);
- }
-
- /**
- * Set metadata manager factory class name
- *
- * @param managerClsName
- * Manager Class Name
- * @return server configuration
- */
- public ServerConfiguration setMetadataManagerFactoryName(String managerClsName) {
- conf.setProperty(METADATA_MANAGER_FACTORY_CLASS, managerClsName);
- return this;
- }
-
- /**
- * Get metastore implementation class.
- *
- * @return metastore implementation class name.
- */
- public String getMetastoreImplClass() {
- return conf.getString(METASTORE_IMPL_CLASS);
- }
-
- /**
- * Get max entries per scan in metastore.
- *
- * @return max entries per scan in metastore.
- */
- public int getMetastoreMaxEntriesPerScan() {
- return conf.getInt(METASTORE_MAX_ENTRIES_PER_SCAN, 50);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java
deleted file mode 100644
index fd8234f..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.common;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TerminateJVMExceptionHandler implements Thread.UncaughtExceptionHandler {
- private static Logger logger = LoggerFactory.getLogger(TerminateJVMExceptionHandler.class);
-
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- logger.error("Uncaught exception in thread " + t.getName(), e);
- Runtime.getRuntime().exit(1);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java b/hedwig-server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java
deleted file mode 100644
index 3c4a562..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.common;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.ScheduledExecutorService;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.util.Callback;
-
-public class TopicOpQueuer {
- /**
- * Map from topic to the queue of operations for that topic.
- */
- protected HashMap<ByteString, Queue<Runnable>> topic2ops = new HashMap<ByteString, Queue<Runnable>>();
-
- protected final ScheduledExecutorService scheduler;
-
- public TopicOpQueuer(ScheduledExecutorService scheduler) {
- this.scheduler = scheduler;
- }
-
- public interface Op extends Runnable {
- }
-
- public abstract class AsynchronousOp<T> implements Op {
- final public ByteString topic;
- final public Callback<T> cb;
- final public Object ctx;
-
- public AsynchronousOp(final ByteString topic, final Callback<T> cb, Object ctx) {
- this.topic = topic;
- this.cb = new Callback<T>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- cb.operationFailed(ctx, exception);
- popAndRunNext(topic);
- }
-
- @Override
- public void operationFinished(Object ctx, T resultOfOperation) {
- cb.operationFinished(ctx, resultOfOperation);
- popAndRunNext(topic);
- }
- };
- this.ctx = ctx;
- }
- }
-
- public abstract class SynchronousOp implements Op {
- final public ByteString topic;
-
- public SynchronousOp(ByteString topic) {
- this.topic = topic;
- }
-
- @Override
- public final void run() {
- runInternal();
- popAndRunNext(topic);
- }
-
- protected abstract void runInternal();
-
- }
-
- protected synchronized void popAndRunNext(ByteString topic) {
- Queue<Runnable> ops = topic2ops.get(topic);
- if (!ops.isEmpty())
- ops.remove();
- if (!ops.isEmpty())
- scheduler.submit(ops.peek());
- }
-
- public void pushAndMaybeRun(ByteString topic, Op op) {
- int size;
- synchronized (this) {
- Queue<Runnable> ops = topic2ops.get(topic);
- if (ops == null) {
- ops = new LinkedList<Runnable>();
- topic2ops.put(topic, ops);
- }
- ops.add(op);
- size = ops.size();
- }
- if (size == 1)
- op.run();
- }
-
- public Runnable peek(ByteString topic) {
- return topic2ops.get(topic).peek();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java b/hedwig-server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java
deleted file mode 100644
index 364ffdc..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.common;
-
-public class UnexpectedError extends Error {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public UnexpectedError(String msg) {
- super(msg);
- }
-
- public UnexpectedError(Throwable cause) {
- super(cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java b/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java
deleted file mode 100644
index e7dc1fa..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.server.common.UnexpectedError;
-
-public class ChannelEndPoint implements DeliveryEndPoint, ChannelFutureListener {
-
- Channel channel;
-
- public Channel getChannel() {
- return channel;
- }
-
- Map<ChannelFuture, DeliveryCallback> callbacks = new HashMap<ChannelFuture, DeliveryCallback>();
-
- public ChannelEndPoint(Channel channel) {
- this.channel = channel;
- }
-
- public void close() {
- channel.close();
- }
-
- public void send(PubSubResponse response, DeliveryCallback callback) {
- ChannelFuture future = channel.write(response);
- callbacks.put(future, callback);
- future.addListener(this);
- }
-
- public void operationComplete(ChannelFuture future) throws Exception {
- DeliveryCallback callback = callbacks.get(future);
- callbacks.remove(future);
-
- if (callback == null) {
- throw new UnexpectedError("Could not locate callback for channel future");
- }
-
- if (future.isSuccess()) {
- callback.sendingFinished();
- } else {
- // treat all channel errors as permanent
- callback.permanentErrorOnSend();
- }
-
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof ChannelEndPoint) {
- ChannelEndPoint channelEndPoint = (ChannelEndPoint) obj;
- return channel.equals(channelEndPoint.channel);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return channel.hashCode();
- }
-
- @Override
- public String toString() {
- return channel.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java b/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java
deleted file mode 100644
index 9ee63f1..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-public interface DeliveryCallback {
-
- public void sendingFinished();
-
- public void transientErrorOnSend();
-
- public void permanentErrorOnSend();
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java b/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java
deleted file mode 100644
index 0774801..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-
-public interface DeliveryEndPoint {
-
- public void send(PubSubResponse response, DeliveryCallback callback);
-
- public void close();
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
deleted file mode 100644
index af3d150..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.util.Callback;
-
-public interface DeliveryManager {
- public void start();
-
- /**
- * Start serving a given subscription.
- *
- * @param topic
- * Topic Name
- * @param subscriberId
- * Subscriber Id
- * @param preferences
- * Subscription Preferences
- * @param seqIdToStartFrom
- * Message sequence id starting delivery from.
- * @param endPoint
- * End point to deliver messages to.
- * @param filter
- * Message filter used to filter messages before delivery.
- * @param callback
- * Callback instance.
- * @param ctx
- * Callback context.
- */
- public void startServingSubscription(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences,
- MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint,
- ServerMessageFilter filter,
- Callback<Void> callback, Object ctx);
-
- /**
- * Stop serving a given subscription.
- *
- * @param topic
- * Topic Name
- * @param subscriberId
- * Subscriber Id
- * @param event
- * Subscription event indicating the reason to stop the subscriber.
- * @param callback
- * Callback instance.
- * @param ctx
- * Callback context.
- */
- public void stopServingSubscriber(ByteString topic, ByteString subscriberId,
- SubscriptionEvent event,
- Callback<Void> callback, Object ctx);
-
- /**
- * Tell the delivery manager where that a subscriber has consumed
- *
- * @param topic
- * Topic Name
- * @param subscriberId
- * Subscriber Id
- * @param consumedSeqId
- * Max consumed seq id.
- */
- public void messageConsumed(ByteString topic, ByteString subscriberId,
- MessageSeqId consumedSeqId);
-
- /**
- * Stop delivery manager
- */
- public void stop();
-}