You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:25 UTC
[15/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
deleted file mode 100644
index 3838e96..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.jboss.netty.channel.Channel;
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.netty.ServerStats.OpStats;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.server.subscriptions.SubscriptionManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class UnsubscribeHandler extends BaseHandler {
- SubscriptionManager subMgr;
- DeliveryManager deliveryMgr;
- SubscriptionChannelManager subChannelMgr;
- // op stats
- final OpStats unsubStats;
-
- public UnsubscribeHandler(ServerConfiguration cfg,
- TopicManager tm,
- SubscriptionManager subMgr,
- DeliveryManager deliveryMgr,
- SubscriptionChannelManager subChannelMgr) {
- super(tm, cfg);
- this.subMgr = subMgr;
- this.deliveryMgr = deliveryMgr;
- this.subChannelMgr = subChannelMgr;
- unsubStats = ServerStats.getInstance().getOpStats(OperationType.UNSUBSCRIBE);
- }
-
- @Override
- public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
- if (!request.hasUnsubscribeRequest()) {
- UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
- "Missing unsubscribe request data");
- unsubStats.incrementFailedOps();
- return;
- }
-
- final UnsubscribeRequest unsubRequest = request.getUnsubscribeRequest();
- final ByteString topic = request.getTopic();
- final ByteString subscriberId = unsubRequest.getSubscriberId();
-
- final long requestTime = MathUtils.now();
- subMgr.unsubscribe(topic, subscriberId, new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
- unsubStats.incrementFailedOps();
- }
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- // we should not close the channel in delivery manager
- // since client waits the response for closeSubscription request
- // client side would close the channel
- deliveryMgr.stopServingSubscriber(topic, subscriberId, null,
- new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
- unsubStats.incrementFailedOps();
- }
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- // remove the topic subscription from subscription channels
- subChannelMgr.remove(new TopicSubscriber(topic, subscriberId),
- channel);
- channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
- unsubStats.updateLatency(System.currentTimeMillis() - requestTime);
- }
- }, ctx);
- }
- }, null);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigJMXService.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigJMXService.java b/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigJMXService.java
deleted file mode 100644
index f0081d9..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigJMXService.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.jmx;
-
-/**
- * An implementor of this interface is basiclly responsible for jmx beans.
- */
-public interface HedwigJMXService {
- /**
- * register jmx
- *
- * @param parent
- * Parent JMX Bean
- */
- public void registerJMX(HedwigMBeanInfo parent);
-
- /**
- * unregister jmx
- */
- public void unregisterJMX();
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanInfo.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanInfo.java b/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanInfo.java
deleted file mode 100644
index 866a217..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanInfo.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.jmx;
-
-import org.apache.zookeeper.jmx.ZKMBeanInfo;
-
-/**
- * Hedwig MBean info interface.
- */
-public interface HedwigMBeanInfo extends ZKMBeanInfo {
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanRegistry.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanRegistry.java b/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanRegistry.java
deleted file mode 100644
index 563cae8..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanRegistry.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.jmx;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.bookkeeper.jmx.BKMBeanRegistry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class provides a unified interface for registering/unregistering of
- * Hedwig MBeans with the platform MBean server.
- */
-public class HedwigMBeanRegistry extends BKMBeanRegistry {
-
- static final String SERVICE = "org.apache.HedwigServer";
-
- static HedwigMBeanRegistry instance = new HedwigMBeanRegistry();
-
- public static HedwigMBeanRegistry getInstance(){
- return instance;
- }
-
- @Override
- protected String getDomainName() {
- return SERVICE;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java
deleted file mode 100644
index a4253f8..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package org.apache.hedwig.server.meta;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.TextFormat;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.zookeeper.ZkUtils;
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * This class encapsulates metadata manager layout information
- * that is persistently stored in zookeeper.
- * It provides parsing and serialization methods of such information.
- *
- */
-public class FactoryLayout {
- private static final Logger logger = LoggerFactory.getLogger(FactoryLayout.class);
-
- // metadata manager name
- public static final String NAME = "METADATA";
- // Znode name to store layout information
- public static final String LAYOUT_ZNODE = "LAYOUT";
- public static final String LSEP = "\n";
-
- private ManagerMeta managerMeta;
-
- /**
- * Construct metadata manager factory layout.
- *
- * @param meta
- * Meta describes what kind of factory used.
- */
- public FactoryLayout(ManagerMeta meta) {
- this.managerMeta = meta;
- }
-
- public static String getFactoryLayoutPath(StringBuilder sb, ServerConfiguration cfg) {
- return cfg.getZkManagersPrefix(sb).append("/").append(NAME)
- .append("/").append(LAYOUT_ZNODE).toString();
- }
-
- public ManagerMeta getManagerMeta() {
- return managerMeta;
- }
-
- /**
- * Store the factory layout into zookeeper
- *
- * @param zk
- * ZooKeeper Handle
- * @param cfg
- * Server Configuration Object
- * @throws KeeperException
- * @throws IOException
- * @throws InterruptedException
- */
- public void store(ZooKeeper zk, ServerConfiguration cfg)
- throws KeeperException, IOException, InterruptedException {
- String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg);
-
- byte[] layoutData = TextFormat.printToString(managerMeta).getBytes(UTF_8);
- ZkUtils.createFullPathOptimistic(zk, factoryLayoutPath, layoutData,
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
-
- @Override
- public int hashCode() {
- return managerMeta.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (null == o ||
- !(o instanceof FactoryLayout)) {
- return false;
- }
- FactoryLayout other = (FactoryLayout)o;
- return managerMeta.equals(other.managerMeta);
- }
-
- @Override
- public String toString() {
- return TextFormat.printToString(managerMeta);
- }
-
- /**
- * Read factory layout from zookeeper
- *
- * @param zk
- * ZooKeeper Client
- * @param cfg
- * Server configuration object
- * @return Factory layout, or null if none set in zookeeper
- */
- public static FactoryLayout readLayout(final ZooKeeper zk,
- final ServerConfiguration cfg)
- throws IOException, KeeperException {
- String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg);
- byte[] layoutData;
- try {
- layoutData = zk.getData(factoryLayoutPath, false, null);
- } catch (KeeperException.NoNodeException nne) {
- return null;
- } catch (InterruptedException ie) {
- throw new IOException(ie);
- }
- ManagerMeta meta;
- try {
- BufferedReader reader = new BufferedReader(
- new StringReader(new String(layoutData, UTF_8)));
- ManagerMeta.Builder metaBuilder = ManagerMeta.newBuilder();
- TextFormat.merge(reader, metaBuilder);
- meta = metaBuilder.build();
- } catch (InvalidProtocolBufferException ipbe) {
- throw new IOException("Corrupted factory layout : ", ipbe);
- }
-
- return new FactoryLayout(meta);
- }
-
- /**
- * Remove the factory layout from ZooKeeper.
- *
- * @param zk
- * ZooKeeper instance
- * @param cfg
- * Server configuration object
- * @throws KeeperException
- * @throws InterruptedException
- */
- public static void deleteLayout(ZooKeeper zk, ServerConfiguration cfg)
- throws KeeperException, InterruptedException {
- String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg);
- zk.delete(factoryLayoutPath, -1);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java
deleted file mode 100644
index 129d03d..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.meta;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-
-import com.google.protobuf.ByteString;
-
-/**
- * Metadata Manager used to manage metadata used by hedwig.
- */
-public abstract class MetadataManagerFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(MetadataManagerFactory.class);
-
- /**
- * Return current factory version.
- *
- * @return current version used by factory.
- */
- public abstract int getCurrentVersion();
-
- /**
- * Initialize the metadata manager factory with given
- * configuration and version.
- *
- * @param cfg
- * Server configuration object
- * @param zk
- * ZooKeeper handler
- * @param version
- * Manager version
- * @return metadata manager factory
- * @throws IOException when fail to initialize the manager.
- */
- protected abstract MetadataManagerFactory initialize(
- ServerConfiguration cfg, ZooKeeper zk, int version)
- throws IOException;
-
- /**
- * Uninitialize the factory.
- *
- * @throws IOException when fail to shutdown the factory.
- */
- public abstract void shutdown() throws IOException;
-
- /**
- * Iterate over the topics list.
- * Used by HedwigConsole to list available topics.
- *
- * @return iterator of the topics list.
- * @throws IOException
- */
- public abstract Iterator<ByteString> getTopics() throws IOException;
-
- /**
- * Create topic persistence manager.
- *
- * @return topic persistence manager
- */
- public abstract TopicPersistenceManager newTopicPersistenceManager();
-
- /**
- * Create subscription data manager.
- *
- * @return subscription data manager.
- */
- public abstract SubscriptionDataManager newSubscriptionDataManager();
-
- /**
- * Create topic ownership manager.
- *
- * @return topic ownership manager.
- */
- public abstract TopicOwnershipManager newTopicOwnershipManager();
-
- /**
- * Format the metadata for Hedwig.
- *
- * @param cfg
- * Configuration instance
- * @param zk
- * ZooKeeper instance
- */
- public abstract void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException;
-
- /**
- * Create new Metadata Manager Factory.
- *
- * @param conf
- * Configuration Object.
- * @param zk
- * ZooKeeper Client Handle, talk to zk to know which manager factory is used.
- * @return new manager factory.
- * @throws IOException
- */
- public static MetadataManagerFactory newMetadataManagerFactory(
- final ServerConfiguration conf, final ZooKeeper zk)
- throws IOException, KeeperException, InterruptedException {
- Class<? extends MetadataManagerFactory> factoryClass;
- try {
- factoryClass = conf.getMetadataManagerFactoryClass();
- } catch (Exception e) {
- throw new IOException("Failed to get metadata manager factory class from configuration : ", e);
- }
- // check that the configured manager is
- // compatible with the existing layout
- FactoryLayout layout = FactoryLayout.readLayout(zk, conf);
- if (layout == null) { // no existing layout
- return createMetadataManagerFactory(conf, zk, factoryClass);
- }
- LOG.debug("read meta layout {}", layout);
-
- if (factoryClass != null &&
- !layout.getManagerMeta().getManagerImpl().equals(factoryClass.getName())) {
- throw new IOException("Configured metadata manager factory " + factoryClass.getName()
- + " does not match existing factory " + layout.getManagerMeta().getManagerImpl());
- }
- if (factoryClass == null) {
- // no factory specified in configuration
- String factoryClsName = layout.getManagerMeta().getManagerImpl();
- try {
- Class<?> theCls = Class.forName(factoryClsName);
- if (!MetadataManagerFactory.class.isAssignableFrom(theCls)) {
- throw new IOException("Wrong metadata manager factory " + factoryClsName);
- }
- factoryClass = theCls.asSubclass(MetadataManagerFactory.class);
- } catch (ClassNotFoundException cnfe) {
- throw new IOException("No class found to instantiate metadata manager factory " + factoryClsName);
- }
- }
- // instantiate the metadata manager factory
- MetadataManagerFactory managerFactory;
- try {
- managerFactory = ReflectionUtils.newInstance(factoryClass);
- } catch (Throwable t) {
- throw new IOException("Failed to instantiate metadata manager factory : " + factoryClass, t);
- }
- return managerFactory.initialize(conf, zk, layout.getManagerMeta().getManagerVersion());
- }
-
- /**
- * Create metadata manager factory and write factory layout to ZooKeeper.
- *
- * @param cfg
- * Server Configuration object.
- * @param zk
- * ZooKeeper instance.
- * @param factoryClass
- * Metadata Manager Factory Class.
- * @return metadata manager factory instance.
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- public static MetadataManagerFactory createMetadataManagerFactory(
- ServerConfiguration cfg, ZooKeeper zk,
- Class<? extends MetadataManagerFactory> factoryClass)
- throws IOException, KeeperException, InterruptedException {
- // use default manager if no one provided
- if (factoryClass == null) {
- factoryClass = ZkMetadataManagerFactory.class;
- }
-
- MetadataManagerFactory managerFactory;
- try {
- managerFactory = ReflectionUtils.newInstance(factoryClass);
- } catch (Throwable t) {
- throw new IOException("Fail to instantiate metadata manager factory : " + factoryClass, t);
- }
- ManagerMeta managerMeta = ManagerMeta.newBuilder()
- .setManagerImpl(factoryClass.getName())
- .setManagerVersion(managerFactory.getCurrentVersion())
- .build();
- FactoryLayout layout = new FactoryLayout(managerMeta);
- try {
- layout.store(zk, cfg);
- } catch (KeeperException.NodeExistsException nee) {
- FactoryLayout layout2 = FactoryLayout.readLayout(zk, cfg);
- if (!layout2.equals(layout)) {
- throw new IOException("Contention writing to layout to zookeeper, "
- + " other layout " + layout2 + " is incompatible with our "
- + "layout " + layout);
- }
- }
- return managerFactory.initialize(cfg, zk, layout.getManagerMeta().getManagerVersion());
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java
deleted file mode 100644
index b44ca91..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java
+++ /dev/null
@@ -1,867 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.meta;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static com.google.common.base.Charsets.UTF_8;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.TextFormat;
-import com.google.protobuf.TextFormat.ParseException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.bookkeeper.metastore.MetaStore;
-import org.apache.bookkeeper.metastore.MetastoreCallback;
-import org.apache.bookkeeper.metastore.MetastoreCursor;
-import org.apache.bookkeeper.metastore.MetastoreCursor.ReadEntriesCallback;
-import org.apache.bookkeeper.metastore.MetastoreException;
-import org.apache.bookkeeper.metastore.MetastoreFactory;
-import org.apache.bookkeeper.metastore.MetastoreScannableTable;
-import org.apache.bookkeeper.metastore.MetastoreScannableTable.Order;
-import org.apache.bookkeeper.metastore.MetastoreTable;
-import org.apache.bookkeeper.metastore.MetastoreUtils;
-
-import static org.apache.bookkeeper.metastore.MetastoreTable.*;
-import org.apache.bookkeeper.metastore.MetastoreTableItem;
-import org.apache.bookkeeper.metastore.MSException;
-import org.apache.bookkeeper.metastore.Value;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.topics.HubInfo;
-import org.apache.hedwig.util.Callback;
-
-import org.apache.zookeeper.ZooKeeper;
-
-/**
- * MetadataManagerFactory for plug-in metadata storage.
- */
-public class MsMetadataManagerFactory extends MetadataManagerFactory {
- protected final static Logger logger = LoggerFactory.getLogger(MsMetadataManagerFactory.class);
-
- static final String UTF8 = "UTF-8";
-
- static final int CUR_VERSION = 1;
-
- static final String OWNER_TABLE_NAME = "owner";
- static final String PERSIST_TABLE_NAME = "persist";
- static final String SUB_TABLE_NAME = "sub";
-
- static class SyncResult<T> {
- T value;
- int rc;
- boolean finished = false;
-
- public synchronized void complete(int rc, T value) {
- this.rc = rc;
- this.value = value;
- finished = true;
-
- notify();
- }
-
- public synchronized void block() throws InterruptedException {
- while (!finished) {
- wait();
- }
- }
-
- public int getReturnCode() {
- return rc;
- }
-
- public T getValue() {
- return value;
- }
- }
-
- MetaStore metastore;
- MetastoreTable ownerTable;
- MetastoreTable persistTable;
- MetastoreScannableTable subTable;
- ServerConfiguration cfg;
-
- @Override
- public MetadataManagerFactory initialize(ServerConfiguration cfg, ZooKeeper zk, int version) throws IOException {
- if (CUR_VERSION != version) {
- throw new IOException("Incompatible MsMetadataManagerFactory version " + version
- + " found, expected version " + CUR_VERSION);
- }
- this.cfg = cfg;
- try {
- metastore = MetastoreFactory.createMetaStore(cfg.getMetastoreImplClass());
- // TODO: need to store metastore class and version in some place.
- metastore.init(cfg.getConf(), metastore.getVersion());
- } catch (Exception e) {
- throw new IOException("Load metastore failed : ", e);
- }
-
- try {
- ownerTable = metastore.createTable(OWNER_TABLE_NAME);
- if (ownerTable == null) {
- throw new IOException("create owner table failed");
- }
-
- persistTable = metastore.createTable(PERSIST_TABLE_NAME);
- if (persistTable == null) {
- throw new IOException("create persistence table failed");
- }
-
- subTable = metastore.createScannableTable(SUB_TABLE_NAME);
- if (subTable == null) {
- throw new IOException("create subscription table failed");
- }
- } catch (MetastoreException me) {
- throw new IOException("Failed to create tables : ", me);
- }
-
- return this;
- }
-
- @Override
- public int getCurrentVersion() {
- return CUR_VERSION;
- }
-
- @Override
- public void shutdown() {
- if (metastore == null) {
- return;
- }
-
- if (ownerTable != null) {
- ownerTable.close();
- ownerTable = null;
- }
-
- if (persistTable != null) {
- persistTable.close();
- persistTable = null;
- }
-
- if (subTable != null) {
- subTable.close();
- subTable = null;
- }
-
- metastore.close();
- metastore = null;
- }
-
- @Override
- public Iterator<ByteString> getTopics() throws IOException {
- SyncResult<MetastoreCursor> syn = new SyncResult<MetastoreCursor>();
- persistTable.openCursor(NON_FIELDS, new MetastoreCallback<MetastoreCursor>() {
- public void complete(int rc, MetastoreCursor cursor, Object ctx) {
- @SuppressWarnings("unchecked")
- SyncResult<MetastoreCursor> syn = (SyncResult<MetastoreCursor>) ctx;
- syn.complete(rc, cursor);
- }
- }, syn);
- try {
- syn.block();
- } catch (Exception e) {
- throw new IOException("Interrupted on getting topics list : ", e);
- }
-
- if (syn.getReturnCode() != MSException.Code.OK.getCode()) {
- throw new IOException("Failed to get topics : ", MSException.create(
- MSException.Code.get(syn.getReturnCode()), ""));
- }
-
- final MetastoreCursor cursor = syn.getValue();
- return new Iterator<ByteString>() {
- Iterator<MetastoreTableItem> itemIter = null;
-
- @Override
- public boolean hasNext() {
- while (null == itemIter || !itemIter.hasNext()) {
- if (!cursor.hasMoreEntries()) {
- return false;
- }
-
- try {
- itemIter = cursor.readEntries(cfg.getMetastoreMaxEntriesPerScan());
- } catch (MSException mse) {
- logger.warn("Interrupted when iterating the topics list : ", mse);
- return false;
- }
- }
- return true;
- }
-
- @Override
- public ByteString next() {
- MetastoreTableItem t = itemIter.next();
- return ByteString.copyFromUtf8(t.getKey());
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Doesn't support remove topic from topic iterator.");
- }
- };
- }
-
- @Override
- public TopicOwnershipManager newTopicOwnershipManager() {
- return new MsTopicOwnershipManagerImpl(ownerTable);
- }
-
- static class MsTopicOwnershipManagerImpl implements TopicOwnershipManager {
-
- static final String OWNER_FIELD = "owner";
-
- final MetastoreTable ownerTable;
-
- MsTopicOwnershipManagerImpl(MetastoreTable ownerTable) {
- this.ownerTable = ownerTable;
- }
-
- @Override
- public void close() throws IOException {
- // do nothing
- }
-
- @Override
- public void readOwnerInfo(final ByteString topic, final Callback<Versioned<HubInfo>> callback, Object ctx) {
- ownerTable.get(topic.toStringUtf8(), new MetastoreCallback<Versioned<Value>>() {
- @Override
- public void complete(int rc, Versioned<Value> value, Object ctx) {
- if (MSException.Code.NoKey.getCode() == rc) {
- callback.operationFinished(ctx, null);
- return;
- }
-
- if (MSException.Code.OK.getCode() != rc) {
- logErrorAndFinishOperation("Could not read ownership for topic " + topic.toStringUtf8(),
- callback, ctx, rc);
- return;
- }
-
- HubInfo owner = null;
- try {
- byte[] data = value.getValue().getField(OWNER_FIELD);
- if (data != null) {
- owner = HubInfo.parse(new String(data, UTF_8));
- }
- } catch (HubInfo.InvalidHubInfoException ihie) {
- logger.warn("Failed to parse hub info for topic " + topic.toStringUtf8(), ihie);
- }
- Version version = value.getVersion();
- callback.operationFinished(ctx, new Versioned<HubInfo>(owner, version));
- }
- }, ctx);
- }
-
- @Override
- public void writeOwnerInfo(final ByteString topic, final HubInfo owner, final Version version,
- final Callback<Version> callback, Object ctx) {
- Value value = new Value();
- value.setField(OWNER_FIELD, owner.toString().getBytes(UTF_8));
-
- ownerTable.put(topic.toStringUtf8(), value, version, new MetastoreCallback<Version>() {
- @Override
- public void complete(int rc, Version ver, Object ctx) {
- if (MSException.Code.OK.getCode() == rc) {
- callback.operationFinished(ctx, ver);
- return;
- } else if (MSException.Code.NoKey.getCode() == rc) {
- // no node
- callback.operationFailed(
- ctx,
- PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO, "No owner info found for topic "
- + topic.toStringUtf8()));
- return;
- } else if (MSException.Code.KeyExists.getCode() == rc) {
- // key exists
- callback.operationFailed(
- ctx,
- PubSubException.create(StatusCode.TOPIC_OWNER_INFO_EXISTS, "Owner info of topic "
- + topic.toStringUtf8() + " existed."));
- return;
- } else if (MSException.Code.BadVersion.getCode() == rc) {
- // bad version
- callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
- "Bad version provided to update owner info of topic " + topic.toStringUtf8()));
- return;
- } else {
- logErrorAndFinishOperation("Failed to update ownership of topic " + topic.toStringUtf8()
- + " to " + owner, callback, ctx, rc);
- return;
- }
- }
- }, ctx);
- }
-
- @Override
- public void deleteOwnerInfo(final ByteString topic, Version version, final Callback<Void> callback,
- Object ctx) {
- ownerTable.remove(topic.toStringUtf8(), version, new MetastoreCallback<Void>() {
- @Override
- public void complete(int rc, Void value, Object ctx) {
- if (MSException.Code.OK.getCode() == rc) {
- logger.debug("Successfully deleted owner info for topic {}", topic.toStringUtf8());
- callback.operationFinished(ctx, null);
- return;
- } else if (MSException.Code.NoKey.getCode() == rc) {
- // no node
- callback.operationFailed(
- ctx,
- PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO, "No owner info found for topic "
- + topic.toStringUtf8()));
- return;
- } else if (MSException.Code.BadVersion.getCode() == rc) {
- // bad version
- callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
- "Bad version provided to delete owner info of topic " + topic.toStringUtf8()));
- return;
- } else {
- logErrorAndFinishOperation("Failed to delete owner info for topic " + topic.toStringUtf8(),
- callback, ctx, rc);
- return;
- }
- }
- }, ctx);
- }
- }
-
- @Override
- public TopicPersistenceManager newTopicPersistenceManager() {
- return new MsTopicPersistenceManagerImpl(persistTable);
- }
-
- static class MsTopicPersistenceManagerImpl implements TopicPersistenceManager {
-
- static final String PERSIST_FIELD = "prst";
-
- final MetastoreTable persistTable;
-
- MsTopicPersistenceManagerImpl(MetastoreTable persistTable) {
- this.persistTable = persistTable;
- }
-
- @Override
- public void close() throws IOException {
- // do nothing
- }
-
- @Override
- public void readTopicPersistenceInfo(final ByteString topic, final Callback<Versioned<LedgerRanges>> callback,
- Object ctx) {
- persistTable.get(topic.toStringUtf8(), new MetastoreCallback<Versioned<Value>>() {
- @Override
- public void complete(int rc, Versioned<Value> value, Object ctx) {
- if (MSException.Code.OK.getCode() == rc) {
- byte[] data = value.getValue().getField(PERSIST_FIELD);
- if (data != null) {
- parseAndReturnTopicLedgerRanges(topic, data, value.getVersion(), callback, ctx);
- } else { // null data is same as NoKey
- callback.operationFinished(ctx, null);
- }
- } else if (MSException.Code.NoKey.getCode() == rc) {
- callback.operationFinished(ctx, null);
- } else {
- logErrorAndFinishOperation("Could not read ledgers node for topic " + topic.toStringUtf8(),
- callback, ctx, rc);
- }
- }
- }, ctx);
- }
-
- /**
- * Parse ledger ranges data and return it thru callback.
- *
- * @param topic
- * Topic name
- * @param data
- * Topic Ledger Ranges data
- * @param version
- * Version of the topic ledger ranges data
- * @param callback
- * Callback to return ledger ranges
- * @param ctx
- * Context of the callback
- */
- private void parseAndReturnTopicLedgerRanges(ByteString topic, byte[] data, Version version,
- Callback<Versioned<LedgerRanges>> callback, Object ctx) {
- try {
- LedgerRanges.Builder rangesBuilder = LedgerRanges.newBuilder();
- TextFormat.merge(new String(data, UTF8), rangesBuilder);
- LedgerRanges lr = rangesBuilder.build();
- Versioned<LedgerRanges> ranges = new Versioned<LedgerRanges>(lr, version);
- callback.operationFinished(ctx, ranges);
- } catch (ParseException e) {
- StringBuilder sb = new StringBuilder();
- sb.append("Ledger ranges for topic ").append(topic.toStringUtf8())
- .append(" could not be deserialized.");
- String msg = sb.toString();
- logger.error(msg, e);
- callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
- } catch (UnsupportedEncodingException uee) {
- StringBuilder sb = new StringBuilder();
- sb.append("Ledger ranges for topic ").append(topic.toStringUtf8()).append(" is not UTF-8 encoded.");
- String msg = sb.toString();
- logger.error(msg, uee);
- callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
- }
- }
-
- @Override
- public void writeTopicPersistenceInfo(final ByteString topic, LedgerRanges ranges, final Version version,
- final Callback<Version> callback, Object ctx) {
- Value value = new Value();
- value.setField(PERSIST_FIELD, TextFormat.printToString(ranges).getBytes(UTF_8));
-
- persistTable.put(topic.toStringUtf8(), value, version, new MetastoreCallback<Version>() {
- @Override
- public void complete(int rc, Version ver, Object ctx) {
- if (MSException.Code.OK.getCode() == rc) {
- callback.operationFinished(ctx, ver);
- return;
- } else if (MSException.Code.NoKey.getCode() == rc) {
- // no node
- callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO,
- "No persistence info found for topic " + topic.toStringUtf8()));
- return;
- } else if (MSException.Code.KeyExists.getCode() == rc) {
- // key exists
- callback.operationFailed(ctx, PubSubException.create(StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS,
- "Persistence info of topic " + topic.toStringUtf8() + " existed."));
- return;
- } else if (MSException.Code.BadVersion.getCode() == rc) {
- // bad version
- callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
- "Bad version provided to update persistence info of topic " + topic.toStringUtf8()));
- return;
- } else {
- logErrorAndFinishOperation("Could not write ledgers node for topic " + topic.toStringUtf8(),
- callback, ctx, rc);
- }
- }
- }, ctx);
- }
-
- @Override
- public void deleteTopicPersistenceInfo(final ByteString topic, final Version version,
- final Callback<Void> callback, Object ctx) {
- persistTable.remove(topic.toStringUtf8(), version, new MetastoreCallback<Void>() {
- @Override
- public void complete(int rc, Void value, Object ctx) {
- if (MSException.Code.OK.getCode() == rc) {
- logger.debug("Successfully deleted persistence info for topic {}.", topic.toStringUtf8());
- callback.operationFinished(ctx, null);
- return;
- } else if (MSException.Code.NoKey.getCode() == rc) {
- // no node
- callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO,
- "No persistence info found for topic " + topic.toStringUtf8()));
- return;
- } else if (MSException.Code.BadVersion.getCode() == rc) {
- // bad version
- callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
- "Bad version provided to delete persistence info of topic " + topic.toStringUtf8()));
- return;
- } else {
- logErrorAndFinishOperation("Failed to delete persistence info topic: " + topic.toStringUtf8()
- + ", version: " + version, callback, ctx, rc, StatusCode.SERVICE_DOWN);
- return;
- }
- }
- }, ctx);
- }
- }
-
- @Override
- public SubscriptionDataManager newSubscriptionDataManager() {
- return new MsSubscriptionDataManagerImpl(cfg, subTable);
- }
-
- static class MsSubscriptionDataManagerImpl implements SubscriptionDataManager {
-
- static final String SUB_STATE_FIELD = "sub_state";
- static final String SUB_PREFS_FIELD = "sub_preferences";
-
- static final char TOPIC_SUB_FIRST_SEPARATOR = '\001';
- static final char TOPIC_SUB_LAST_SEPARATOR = '\002';
-
- final ServerConfiguration cfg;
- final MetastoreScannableTable subTable;
-
- MsSubscriptionDataManagerImpl(ServerConfiguration cfg, MetastoreScannableTable subTable) {
- this.cfg = cfg;
- this.subTable = subTable;
- }
-
- @Override
- public void close() throws IOException {
- // do nothing
- }
-
- private String getSubscriptionKey(ByteString topic, ByteString subscriberId) {
- return new StringBuilder(topic.toStringUtf8()).append(TOPIC_SUB_FIRST_SEPARATOR)
- .append(subscriberId.toStringUtf8()).toString();
- }
-
- private Value subscriptionData2Value(SubscriptionData subData) {
- Value value = new Value();
- if (subData.hasState()) {
- value.setField(SUB_STATE_FIELD, TextFormat.printToString(subData.getState()).getBytes(UTF_8));
- }
- if (subData.hasPreferences()) {
- value.setField(SUB_PREFS_FIELD, TextFormat.printToString(subData.getPreferences()).getBytes(UTF_8));
- }
- return value;
- }
-
- @Override
- public void createSubscriptionData(final ByteString topic, final ByteString subscriberId,
- final SubscriptionData subData, final Callback<Version> callback, Object ctx) {
- String key = getSubscriptionKey(topic, subscriberId);
- Value value = subscriptionData2Value(subData);
-
- subTable.put(key, value, Version.NEW, new MetastoreCallback<Version>() {
- @Override
- public void complete(int rc, Version ver, Object ctx) {
- if (rc == MSException.Code.OK.getCode()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Successfully create subscription for topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
- + SubscriptionStateUtils.toString(subData));
- }
- callback.operationFinished(ctx, ver);
- } else if (rc == MSException.Code.KeyExists.getCode()) {
- callback.operationFailed(ctx, PubSubException.create(
- StatusCode.SUBSCRIPTION_STATE_EXISTS,
- "Subscription data for (topic:" + topic.toStringUtf8() + ", subscriber:"
- + subscriberId.toStringUtf8() + ") existed."));
- return;
- } else {
- logErrorAndFinishOperation("Failed to create topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
- + SubscriptionStateUtils.toString(subData), callback, ctx, rc);
- }
- }
- }, ctx);
- }
-
- @Override
- public boolean isPartialUpdateSupported() {
- // TODO: Here we assume Metastore support partial update, but this
- // maybe incorrect.
- return true;
- }
-
- @Override
- public void replaceSubscriptionData(final ByteString topic, final ByteString subscriberId,
- final SubscriptionData subData, final Version version, final Callback<Version> callback,
- final Object ctx) {
- updateSubscriptionData(topic, subscriberId, subData, version, callback, ctx);
- }
-
- @Override
- public void updateSubscriptionData(final ByteString topic, final ByteString subscriberId,
- final SubscriptionData subData, final Version version, final Callback<Version> callback,
- final Object ctx) {
- String key = getSubscriptionKey(topic, subscriberId);
- Value value = subscriptionData2Value(subData);
-
- subTable.put(key, value, version, new MetastoreCallback<Version>() {
- @Override
- public void complete(int rc, Version version, Object ctx) {
- if (rc == MSException.Code.OK.getCode()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Successfully updated subscription data for topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
- + SubscriptionStateUtils.toString(subData) + ", version: " + version);
- }
- callback.operationFinished(ctx, version);
- } else if (rc == MSException.Code.NoKey.getCode()) {
- // no node
- callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE,
- "No subscription data found for (topic:" + topic.toStringUtf8() + ", subscriber:"
- + subscriberId.toStringUtf8() + ")."));
- return;
- } else if (rc == MSException.Code.BadVersion.getCode()) {
- // bad version
- callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
- "Bad version provided to update subscription data of topic " + topic.toStringUtf8()
- + " subscriberId " + subscriberId));
- return;
- } else {
- logErrorAndFinishOperation(
- "Failed to update subscription data for topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
- + SubscriptionStateUtils.toString(subData) + ", version: " + version, callback,
- ctx, rc);
- }
- }
- }, ctx);
- }
-
- @Override
- public void deleteSubscriptionData(final ByteString topic, final ByteString subscriberId, Version version,
- final Callback<Void> callback, Object ctx) {
- String key = getSubscriptionKey(topic, subscriberId);
- subTable.remove(key, version, new MetastoreCallback<Void>() {
- @Override
- public void complete(int rc, Void value, Object ctx) {
- if (rc == MSException.Code.OK.getCode()) {
- logger.debug("Successfully delete subscription for topic: {}, subscriberId: {}.",
- topic.toStringUtf8(), subscriberId.toStringUtf8());
- callback.operationFinished(ctx, null);
- return;
- } else if (rc == MSException.Code.BadVersion.getCode()) {
- // bad version
- callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
- "Bad version provided to delete subscriptoin data of topic " + topic.toStringUtf8()
- + " subscriberId " + subscriberId));
- return;
- } else if (rc == MSException.Code.NoKey.getCode()) {
- // no node
- callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE,
- "No subscription data found for (topic:" + topic.toStringUtf8() + ", subscriber:"
- + subscriberId.toStringUtf8() + ")."));
- return;
- } else {
- logErrorAndFinishOperation("Failed to delete subscription topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8(), callback, ctx, rc,
- StatusCode.SERVICE_DOWN);
- }
- }
- }, ctx);
- }
-
- private SubscriptionData value2SubscriptionData(Value value) throws ParseException,
- UnsupportedEncodingException {
- SubscriptionData.Builder builder = SubscriptionData.newBuilder();
-
- byte[] stateData = value.getField(SUB_STATE_FIELD);
- if (null != stateData) {
- SubscriptionState.Builder stateBuilder = SubscriptionState.newBuilder();
- TextFormat.merge(new String(stateData, UTF8), stateBuilder);
- SubscriptionState state = stateBuilder.build();
- builder.setState(state);
- }
-
- byte[] prefsData = value.getField(SUB_PREFS_FIELD);
- if (null != prefsData) {
- SubscriptionPreferences.Builder preferencesBuilder = SubscriptionPreferences.newBuilder();
- TextFormat.merge(new String(prefsData, UTF8), preferencesBuilder);
- SubscriptionPreferences preferences = preferencesBuilder.build();
- builder.setPreferences(preferences);
- }
-
- return builder.build();
- }
-
- @Override
- public void readSubscriptionData(final ByteString topic, final ByteString subscriberId,
- final Callback<Versioned<SubscriptionData>> callback, Object ctx) {
- String key = getSubscriptionKey(topic, subscriberId);
- subTable.get(key, new MetastoreCallback<Versioned<Value>>() {
- @Override
- public void complete(int rc, Versioned<Value> value, Object ctx) {
- if (rc == MSException.Code.NoKey.getCode()) {
- callback.operationFinished(ctx, null);
- return;
- }
-
- if (rc != MSException.Code.OK.getCode()) {
- logErrorAndFinishOperation(
- "Could not read subscription data for topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8(), callback, ctx, rc);
- return;
- }
-
- try {
- Versioned<SubscriptionData> subData = new Versioned<SubscriptionData>(
- value2SubscriptionData(value.getValue()), value.getVersion());
- if (logger.isDebugEnabled()) {
- logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
- + SubscriptionStateUtils.toString(subData.getValue()) + ", version: "
- + subData.getVersion());
- }
- callback.operationFinished(ctx, subData);
- } catch (ParseException e) {
- StringBuilder sb = new StringBuilder();
- sb.append("Failed to deserialize subscription data for topic:").append(topic.toStringUtf8())
- .append(", subscriberId: ").append(subscriberId.toStringUtf8());
- String msg = sb.toString();
- logger.error(msg, e);
- callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
- } catch (UnsupportedEncodingException uee) {
- StringBuilder sb = new StringBuilder();
- sb.append("Subscription data for topic: ").append(topic.toStringUtf8())
- .append(", subscriberId: ").append(subscriberId.toStringUtf8())
- .append(" is not UFT-8 encoded");
- String msg = sb.toString();
- logger.error(msg, uee);
- callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
- }
- }
- }, ctx);
- }
-
- private String getSubscriptionPrefix(ByteString topic, char sep) {
- return new StringBuilder(topic.toStringUtf8()).append(sep).toString();
- }
-
- private void readSubscriptions(final ByteString topic, final int keyLength, final MetastoreCursor cursor,
- final Map<ByteString, Versioned<SubscriptionData>> topicSubs,
- final Callback<Map<ByteString, Versioned<SubscriptionData>>> callback, Object ctx) {
- if (!cursor.hasMoreEntries()) {
- callback.operationFinished(ctx, topicSubs);
- return;
- }
- ReadEntriesCallback readCb = new ReadEntriesCallback() {
- @Override
- public void complete(int rc, Iterator<MetastoreTableItem> items, Object ctx) {
- if (rc != MSException.Code.OK.getCode()) {
- logErrorAndFinishOperation("Could not read subscribers for cursor " + cursor,
- callback, ctx, rc);
- return;
- }
- while (items.hasNext()) {
- MetastoreTableItem item = items.next();
- final ByteString subscriberId = ByteString.copyFromUtf8(item.getKey().substring(keyLength));
- try {
- Versioned<Value> vv = item.getValue();
- Versioned<SubscriptionData> subData = new Versioned<SubscriptionData>(
- value2SubscriptionData(vv.getValue()), vv.getVersion());
- topicSubs.put(subscriberId, subData);
- } catch (ParseException e) {
- StringBuilder sb = new StringBuilder();
- sb.append("Failed to deserialize subscription data for topic: ")
- .append(topic.toStringUtf8()).append(", subscriberId: ")
- .append(subscriberId.toStringUtf8());
- String msg = sb.toString();
- logger.error(msg, e);
- callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
- return;
- } catch (UnsupportedEncodingException e) {
- StringBuilder sb = new StringBuilder();
- sb.append("Subscription data for topic: ").append(topic.toStringUtf8())
- .append(", subscriberId: ").append(subscriberId.toStringUtf8())
- .append(" is not UTF-8 encoded.");
- String msg = sb.toString();
- logger.error(msg, e);
- callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
- return;
- }
- }
- readSubscriptions(topic, keyLength, cursor, topicSubs, callback, ctx);
- }
- };
- cursor.asyncReadEntries(cfg.getMetastoreMaxEntriesPerScan(), readCb, ctx);
- }
-
- @Override
- public void readSubscriptions(final ByteString topic,
- final Callback<Map<ByteString, Versioned<SubscriptionData>>> callback, Object ctx) {
- final String firstKey = getSubscriptionPrefix(topic, TOPIC_SUB_FIRST_SEPARATOR);
- String lastKey = getSubscriptionPrefix(topic, TOPIC_SUB_LAST_SEPARATOR);
- subTable.openCursor(firstKey, true, lastKey, true, Order.ASC, ALL_FIELDS,
- new MetastoreCallback<MetastoreCursor>() {
- @Override
- public void complete(int rc, MetastoreCursor cursor, Object ctx) {
- if (rc != MSException.Code.OK.getCode()) {
- logErrorAndFinishOperation(
- "Could not read subscribers for topic " + topic.toStringUtf8(), callback, ctx,
- rc);
- return;
- }
-
- final Map<ByteString, Versioned<SubscriptionData>> topicSubs =
- new ConcurrentHashMap<ByteString, Versioned<SubscriptionData>>();
- readSubscriptions(topic, firstKey.length(), cursor, topicSubs, callback, ctx);
- }
- }, ctx);
- }
- }
-
- /**
- * callback finish operation with exception specify by code, regardless of
- * the value of return code rc.
- */
- private static <T> void logErrorAndFinishOperation(String msg, Callback<T> callback, Object ctx, int rc,
- StatusCode code) {
- logger.error(msg, MSException.create(MSException.Code.get(rc), ""));
- callback.operationFailed(ctx, PubSubException.create(code, msg));
- }
-
- /**
- * callback finish operation with corresponding PubSubException converted
- * from return code rc.
- */
- private static <T> void logErrorAndFinishOperation(String msg, Callback<T> callback, Object ctx, int rc) {
- StatusCode code;
-
- if (rc == MSException.Code.NoKey.getCode()) {
- code = StatusCode.NO_SUCH_TOPIC;
- } else if (rc == MSException.Code.ServiceDown.getCode()) {
- code = StatusCode.SERVICE_DOWN;
- } else {
- code = StatusCode.UNEXPECTED_CONDITION;
- }
-
- logErrorAndFinishOperation(msg, callback, ctx, rc, code);
- }
-
- @Override
- public void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException {
- try {
- int maxEntriesPerScan = cfg.getMetastoreMaxEntriesPerScan();
-
- // clean topic ownership table.
- logger.info("Cleaning topic ownership table ...");
- MetastoreUtils.cleanTable(ownerTable, maxEntriesPerScan);
- logger.info("Cleaned topic ownership table successfully.");
-
- // clean topic subscription table.
- logger.info("Cleaning topic subscription table ...");
- MetastoreUtils.cleanTable(subTable, maxEntriesPerScan);
- logger.info("Cleaned topic subscription table successfully.");
-
- // clean topic persistence info table.
- logger.info("Cleaning topic persistence info table ...");
- MetastoreUtils.cleanTable(persistTable, maxEntriesPerScan);
- logger.info("Cleaned topic persistence info table successfully.");
- } catch (MSException mse) {
- throw new IOException("Exception when formatting hedwig metastore : ", mse);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted when formatting hedwig metastore : ", ie);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
deleted file mode 100644
index 0bebd45..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.meta;
-
-import java.io.Closeable;
-import java.util.Map;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Manage subscription data.
- */
-public interface SubscriptionDataManager extends Closeable {
-
- /**
- * Create subscription data.
- *
- * @param topic
- * Topic name
- * @param subscriberId
- * Subscriber id
- * @param data
- * Subscription data
- * @param callback
- * Callback when subscription state created. New version would be returned.
- * {@link PubSubException.SubscriptionStateExistsException} is returned when subscription state
- * existed before.
- * @param ctx
- * Context of the callback
- */
- public void createSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
- Callback<Version> callback, Object ctx);
-
- /**
- * Whether the metadata manager supports partial update.
- *
- * @return true if the metadata manager supports partial update.
- * otherwise, return false.
- */
- public boolean isPartialUpdateSupported();
-
- /**
- * Update subscription data.
- *
- * @param topic
- * Topic name
- * @param subscriberId
- * Subscriber id
- * @param dataToUpdate
- * Subscription data to update. So it is a partial data, which contains
- * the part of data to update. The implementation should not replace
- * existing subscription data with <i>dataToUpdate</i> directly.
- * E.g. if there is only state in it, you should update state only.
- * @param version
- * Current version of subscription data.
- * @param callback
- * Callback when subscription state updated. New version would be returned.
- * {@link PubSubException.BadVersionException} is returned when version doesn't match,
- * {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state
- * is found.
- * @param ctx
- * Context of the callback
- */
- public void updateSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData dataToUpdate,
- Version version, Callback<Version> callback, Object ctx);
-
- /**
- * Replace subscription data.
- *
- * @param topic
- * Topic name
- * @param subscriberId
- * Subscriber id
- * @param dataToReplace
- * Subscription data to replace.
- * @param version
- * Current version of subscription data.
- * @param callback
- * Callback when subscription state updated. New version would be returned.
- * {@link PubSubException.BadVersionException} is returned when version doesn't match,
- * {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state
- * is found.
- * @param ctx
- * Context of the callback
- */
- public void replaceSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData dataToReplace,
- Version version, Callback<Version> callback, Object ctx);
-
- /**
- * Remove subscription data.
- *
- * @param topic
- * Topic name
- * @param subscriberId
- * Subscriber id
- * @param version
- * Current version of subscription data.
- * @param callback
- * Callback when subscription state deleted
- * {@link PubSubException.BadVersionException} is returned when version doesn't match,
- * {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state
- * is found.
- * @param ctx
- * Context of the callback
- */
- public void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version,
- Callback<Void> callback, Object ctx);
-
- /**
- * Read subscription data with version.
- *
- * @param topic
- * Topic Name
- * @param subscriberId
- * Subscriber id
- * @param callback
- * Callback when subscription data read.
- * Null is returned when no subscription data is found.
- * @param ctx
- * Context of the callback
- */
- public void readSubscriptionData(ByteString topic, ByteString subscriberId,
- Callback<Versioned<SubscriptionData>> callback, Object ctx);
-
- /**
- * Read all subscriptions of a topic.
- *
- * @param topic
- * Topic name
- * @param callback
- * Callback to return subscriptions with version information
- * @param ctx
- * Contxt of the callback
- */
- public void readSubscriptions(ByteString topic, Callback<Map<ByteString, Versioned<SubscriptionData>>> cb,
- Object ctx);
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicOwnershipManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicOwnershipManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicOwnershipManager.java
deleted file mode 100644
index f17011c..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicOwnershipManager.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.meta;
-
-import java.io.Closeable;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.hedwig.server.topics.HubInfo;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Manage topic ownership
- */
-public interface TopicOwnershipManager extends Closeable {
-
- /**
- * Read owner information of a topic.
- *
- * @param topic
- * Topic Name
- * @param callback
- * Callback to return hub info. If there is no owner info, return null;
- * If there is data but not valid owner info, return a Versioned object with null hub info;
- * If there is valid owner info, return versioned hub info.
- * @param ctx
- * Context of the callback
- */
- public void readOwnerInfo(ByteString topic, Callback<Versioned<HubInfo>> callback, Object ctx);
-
- /**
- * Write owner info for a specified topic.
- * A new owner info would be created if there is no one existed before.
- *
- * @param topic
- * Topic Name
- * @param owner
- * Owner hub info
- * @param version
- * Current version of owner info
- * If <code>version</code> is {@link Version.NEW}, create owner info.
- * {@link PubSubException.TopicOwnerInfoExistsException} is returned when
- * owner info existed before.
- * Otherwise, the owner info is updated only when
- * provided version equals to its current version.
- * {@link PubSubException.BadVersionException} is returned when version doesn't match,
- * {@link PubSubException.NoTopicOwnerInfoException} is returned when no owner info
- * found to update.
- * @param callback
- * Callback when owner info updated. New version would be returned if succeed to write.
- * @param ctx
- * Context of the callback
- */
- public void writeOwnerInfo(ByteString topic, HubInfo owner, Version version,
- Callback<Version> callback, Object ctx);
-
- /**
- * Delete owner info for a specified topic.
- *
- * @param topic
- * Topic Name
- * @param version
- * Current version of owner info
- * If <code>version</code> is {@link Version.ANY}, delete owner info no matter its current version.
- * Otherwise, the owner info is deleted only when
- * provided version equals to its current version.
- * @param callback
- * Callback when owner info deleted.
- * {@link PubSubException.NoTopicOwnerInfoException} is returned when no owner info.
- * {@link PubSubException.BadVersionException} is returned when version doesn't match.
- * @param ctx
- * Context of the callback.
- */
- public void deleteOwnerInfo(ByteString topic, Version version,
- Callback<Void> callback, Object ctx);
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java
deleted file mode 100644
index 69ee709..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.meta;
-
-import java.io.Closeable;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Manage topic persistence metadata.
- */
-public interface TopicPersistenceManager extends Closeable {
-
- /**
- * Read persistence info of a specified topic.
- *
- * @param topic
- * Topic Name
- * @param callback
- * Callback when read persistence info.
- * If no persistence info found, return null.
- * @param ctx
- * Context of the callback
- */
- public void readTopicPersistenceInfo(ByteString topic,
- Callback<Versioned<LedgerRanges>> callback, Object ctx);
-
- /**
- * Update persistence info of a specified topic.
- *
- * @param topic
- * Topic name
- * @param ranges
- * Persistence info
- * @param version
- * Current version of persistence info.
- * If <code>version</code> is {@link Version.NEW}, create persistence info;
- * {@link PubSubException.TopicPersistenceInfoExistsException} is returned when
- * persistence info existed before.
- * Otherwise, the persitence info is updated only when
- * provided version equals to its current version.
- * {@link PubSubException.BadVersionException} is returned when version doesn't match,
- * {@link PubSubException.NoTopicPersistenceInfoException} is returned when no
- * persistence info found to update.
- * @param callback
- * Callback when persistence info updated. New version would be returned.
- * @param ctx
- * Context of the callback
- */
- public void writeTopicPersistenceInfo(ByteString topic, LedgerRanges ranges, Version version,
- Callback<Version> callback, Object ctx);
-
- /**
- * Delete persistence info of a specified topic.
- * Currently used in test cases.
- *
- * @param topic
- * Topic name
- * @param version
- * Current version of persistence info
- * If <code>version</code> is {@link Version.ANY}, delete persistence info no matter its current version.
- * Otherwise, the persitence info is deleted only when
- * provided version equals to its current version.
- * @param callback
- * Callback return whether the deletion succeed.
- * {@link PubSubException.NoTopicPersistenceInfoException} is returned when no persistence.
- * {@link PubSubException.BadVersionException} is returned when version doesn't match.
- * info found to delete.
- * @param ctx
- * Context of the callback
- */
- public void deleteTopicPersistenceInfo(ByteString topic, Version version,
- Callback<Void> callback, Object ctx);
-
-}