You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/13 18:05:58 UTC

svn commit: r1421384 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/ hedwig-server/src/main/java/org/apache/hedwig/server/common/ hedwig-server/src/main/java/org/apache/hedwig/server/meta/ hedwig-se...

Author: ivank
Date: Thu Dec 13 17:05:57 2012
New Revision: 1421384

URL: http://svn.apache.org/viewvc?rev=1421384&view=rev
Log:
BOOKKEEPER-262: Implement a meta store based hedwig metadata manager. (jiannan via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreUtils.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1421384&r1=1421383&r2=1421384&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Dec 13 17:05:57 2012
@@ -533,7 +533,9 @@ Release 4.1.0 - 2012-06-07
         BOOKKEEPER-97: collect pub/sub/consume statistics on hub server (sijie via ivank)
 
 	BOOKKEEPER-269: Review documentation for hedwig console client (sijie via fpj)
-	
+
+        BOOKKEEPER-262: Implement a meta store based hedwig metadata manager. (jiannan via ivank)
+
       hedwig-client/
 
 	BOOKKEEPER-271: Review documentation for message bounding (ivank via fpj)

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreUtils.java?rev=1421384&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreUtils.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore/MetastoreUtils.java Thu Dec 13 17:05:57 2012
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.metastore;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.metastore.MSException.Code;
+import org.apache.bookkeeper.versioning.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides utilities for metastore.
+ */
+public class MetastoreUtils {
+
+    protected final static Logger logger = LoggerFactory.getLogger(MetastoreUtils.class);
+
+    static class MultiMetastoreCallback<T> implements MetastoreCallback<T> {
+
+        int rc = Code.OK.getCode();
+        final int numOps;
+        final AtomicInteger numFinished = new AtomicInteger(0);
+        final CountDownLatch doneLatch = new CountDownLatch(1);
+
+        MultiMetastoreCallback(int numOps) {
+            this.numOps = numOps;
+        }
+
+        @Override
+        public void complete(int rc, T value, Object ctx) {
+            if (Code.OK.getCode() != rc) {
+                this.rc = rc;
+                doneLatch.countDown();
+                return;
+            }
+            if (numFinished.incrementAndGet() == numOps) {
+                doneLatch.countDown();
+            }
+        }
+
+        public void waitUntilAllFinished() throws MSException, InterruptedException {
+            doneLatch.await();
+            if (Code.OK.getCode() != rc) {
+                throw MSException.create(Code.get(rc));
+            }
+        }
+    }
+
+    static class SyncMetastoreCallback<T> implements MetastoreCallback<T> {
+
+        int rc;
+        T result;
+        final CountDownLatch doneLatch = new CountDownLatch(1);
+
+        @Override
+        public void complete(int rc, T value, Object ctx) {
+            this.rc = rc;
+            result = value;
+            doneLatch.countDown();
+        }
+
+        public T getResult() throws MSException, InterruptedException {
+            doneLatch.await();
+
+            if (Code.OK.getCode() != rc) {
+                throw MSException.create(Code.get(rc));
+            }
+            return result;
+        }
+
+    }
+
+    /**
+     * Clean the given table.
+     *
+     * @param table
+     *          Metastore Table.
+     * @param numEntriesPerScan
+     *          Num entries per scan.
+     * @throws MSException
+     * @throws InterruptedException
+     */
+    public static void cleanTable(MetastoreTable table, int numEntriesPerScan)
+    throws MSException, InterruptedException {
+        // open cursor
+        SyncMetastoreCallback<MetastoreCursor> openCb = new SyncMetastoreCallback<MetastoreCursor>();
+        table.openCursor(MetastoreTable.NON_FIELDS, openCb, null);
+        MetastoreCursor cursor = openCb.getResult();
+        logger.info("Open cursor for table {} to clean entries.", table.getName());
+
+        List<String> keysToClean = new ArrayList<String>(numEntriesPerScan);
+        int numEntriesRemoved = 0;
+        while (cursor.hasMoreEntries()) {
+            logger.info("Fetching next {} entries from table {} to clean.",
+                         numEntriesPerScan, table.getName());
+            Iterator<MetastoreTableItem> iter = cursor.readEntries(numEntriesPerScan);
+            keysToClean.clear();
+            while (iter.hasNext()) {
+                MetastoreTableItem item = iter.next();
+                String key = item.getKey();
+                keysToClean.add(key);
+            }
+            if (keysToClean.isEmpty()) {
+                continue;
+            }
+
+            logger.info("Issuing deletes to delete keys {}", keysToClean);
+            // issue deletes to delete batch of keys
+            MultiMetastoreCallback<Void> mcb = new MultiMetastoreCallback<Void>(keysToClean.size());
+            for (String key : keysToClean) {
+                table.remove(key, Version.ANY, mcb, null);
+            }
+            mcb.waitUntilAllFinished();
+            numEntriesRemoved += keysToClean.size();
+            logger.info("Removed {} entries from table {}.", numEntriesRemoved, table.getName());
+        }
+
+        logger.info("Finished cleaning up table {}.", table.getName());
+    }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1421384&r1=1421383&r2=1421384&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Thu Dec 13 17:05:57 2012
@@ -74,6 +74,10 @@ public class ServerConfiguration extends
     protected final static String METADATA_MANAGER_BASED_TOPIC_MANAGER_ENABLED = "metadata_manager_based_topic_manager_enabled";
     protected final static String METADATA_MANAGER_FACTORY_CLASS = "metadata_manager_factory_class";
 
+    // metastore settings, only being used when METADATA_MANAGER_FACTORY_CLASS is MsMetadataManagerFactory
+    protected final static String METASTORE_IMPL_CLASS = "metastore_impl_class";
+    protected final static String METASTORE_MAX_ENTRIES_PER_SCAN = "metastoreMaxEntriesPerScan";
+
     private static ClassLoader defaultLoader;
     static {
         defaultLoader = Thread.currentThread().getContextClassLoader();
@@ -542,4 +546,22 @@ public class ServerConfiguration extends
         conf.setProperty(METADATA_MANAGER_FACTORY_CLASS, managerClsName);
         return this;
     }
+
+    /**
+     * Get metastore implementation class.
+     *
+     * @return metastore implementation class name.
+     */
+    public String getMetastoreImplClass() {
+        return conf.getString(METASTORE_IMPL_CLASS);
+    }
+
+    /**
+     * Get max entries per scan in metastore.
+     *
+     * @return max entries per scan in metastore.
+     */
+    public int getMetastoreMaxEntriesPerScan() {
+        return conf.getInt(METASTORE_MAX_ENTRIES_PER_SCAN, 50);
+    }
 }

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java?rev=1421384&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java Thu Dec 13 17:05:57 2012
@@ -0,0 +1,866 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.meta;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.TextFormat;
+import com.google.protobuf.TextFormat.ParseException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.bookkeeper.metastore.MetaStore;
+import org.apache.bookkeeper.metastore.MetastoreCallback;
+import org.apache.bookkeeper.metastore.MetastoreCursor;
+import org.apache.bookkeeper.metastore.MetastoreCursor.ReadEntriesCallback;
+import org.apache.bookkeeper.metastore.MetastoreException;
+import org.apache.bookkeeper.metastore.MetastoreFactory;
+import org.apache.bookkeeper.metastore.MetastoreScannableTable;
+import org.apache.bookkeeper.metastore.MetastoreScannableTable.Order;
+import org.apache.bookkeeper.metastore.MetastoreTable;
+import org.apache.bookkeeper.metastore.MetastoreUtils;
+
+import static org.apache.bookkeeper.metastore.MetastoreTable.*;
+import org.apache.bookkeeper.metastore.MetastoreTableItem;
+import org.apache.bookkeeper.metastore.MSException;
+import org.apache.bookkeeper.metastore.Value;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.topics.HubInfo;
+import org.apache.hedwig.util.Callback;
+
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * MetadataManagerFactory for plug-in metadata storage.
+ */
+public class MsMetadataManagerFactory extends MetadataManagerFactory {
+    protected final static Logger logger = LoggerFactory.getLogger(MsMetadataManagerFactory.class);
+
+    static final String UTF8 = "UTF-8";
+
+    static final int CUR_VERSION = 1;
+
+    static final String OWNER_TABLE_NAME = "owner";
+    static final String PERSIST_TABLE_NAME = "persist";
+    static final String SUB_TABLE_NAME = "sub";
+
+    static class SyncResult<T> {
+        T value;
+        int rc;
+        boolean finished = false;
+
+        public synchronized void complete(int rc, T value) {
+            this.rc = rc;
+            this.value = value;
+            finished = true;
+
+            notify();
+        }
+
+        public synchronized void block() throws InterruptedException {
+            while (!finished) {
+                wait();
+            }
+        }
+
+        public int getReturnCode() {
+            return rc;
+        }
+
+        public T getValue() {
+            return value;
+        }
+    }
+
+    MetaStore metastore;
+    MetastoreTable ownerTable;
+    MetastoreTable persistTable;
+    MetastoreScannableTable subTable;
+    ServerConfiguration cfg;
+
+    @Override
+    public MetadataManagerFactory initialize(ServerConfiguration cfg, ZooKeeper zk, int version) throws IOException {
+        if (CUR_VERSION != version) {
+            throw new IOException("Incompatible MsMetadataManagerFactory version " + version
+                    + " found, expected version " + CUR_VERSION);
+        }
+        this.cfg = cfg;
+        try {
+            metastore = MetastoreFactory.createMetaStore(cfg.getMetastoreImplClass());
+            // TODO: need to store metastore class and version in some place.
+            metastore.init(cfg.getConf(), metastore.getVersion());
+        } catch (Exception e) {
+            throw new IOException("Load metastore failed : ", e);
+        }
+
+        try {
+            ownerTable = metastore.createTable(OWNER_TABLE_NAME);
+            if (ownerTable == null) {
+                throw new IOException("create owner table failed");
+            }
+
+            persistTable = metastore.createTable(PERSIST_TABLE_NAME);
+            if (persistTable == null) {
+                throw new IOException("create persistence table failed");
+            }
+
+            subTable = metastore.createScannableTable(SUB_TABLE_NAME);
+            if (subTable == null) {
+                throw new IOException("create subscription table failed");
+            }
+        } catch (MetastoreException me) {
+            throw new IOException("Failed to create tables : ", me);
+        }
+
+        return this;
+    }
+
+    @Override
+    public int getCurrentVersion() {
+        return CUR_VERSION;
+    }
+
+    @Override
+    public void shutdown() {
+        if (metastore == null) {
+            return;
+        }
+
+        if (ownerTable != null) {
+            ownerTable.close();
+            ownerTable = null;
+        }
+
+        if (persistTable != null) {
+            persistTable.close();
+            persistTable = null;
+        }
+
+        if (subTable != null) {
+            subTable.close();
+            subTable = null;
+        }
+
+        metastore.close();
+        metastore = null;
+    }
+
+    @Override
+    public Iterator<ByteString> getTopics() throws IOException {
+        SyncResult<MetastoreCursor> syn = new SyncResult<MetastoreCursor>();
+        persistTable.openCursor(NON_FIELDS, new MetastoreCallback<MetastoreCursor>() {
+            public void complete(int rc, MetastoreCursor cursor, Object ctx) {
+                @SuppressWarnings("unchecked")
+                SyncResult<MetastoreCursor> syn = (SyncResult<MetastoreCursor>) ctx;
+                syn.complete(rc, cursor);
+            }
+        }, syn);
+        try {
+            syn.block();
+        } catch (Exception e) {
+            throw new IOException("Interrupted on getting topics list : ", e);
+        }
+
+        if (syn.getReturnCode() != MSException.Code.OK.getCode()) {
+            throw new IOException("Failed to get topics : ", MSException.create(
+                    MSException.Code.get(syn.getReturnCode()), ""));
+        }
+
+        final MetastoreCursor cursor = syn.getValue();
+        return new Iterator<ByteString>() {
+            Iterator<MetastoreTableItem> itemIter = null;
+
+            @Override
+            public boolean hasNext() {
+                while (null == itemIter || !itemIter.hasNext()) {
+                    if (!cursor.hasMoreEntries()) {
+                        return false;
+                    }
+
+                    try {
+                        itemIter = cursor.readEntries(cfg.getMetastoreMaxEntriesPerScan());
+                    } catch (MSException mse) {
+                        logger.warn("Interrupted when iterating the topics list : ", mse);
+                        return false;
+                    }
+                }
+                return true;
+            }
+
+            @Override
+            public ByteString next() {
+                MetastoreTableItem t = itemIter.next();
+                return ByteString.copyFromUtf8(t.getKey());
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("Doesn't support remove topic from topic iterator.");
+            }
+        };
+    }
+
+    @Override
+    public TopicOwnershipManager newTopicOwnershipManager() {
+        return new MsTopicOwnershipManagerImpl(ownerTable);
+    }
+
+    static class MsTopicOwnershipManagerImpl implements TopicOwnershipManager {
+
+        static final String OWNER_FIELD = "owner";
+
+        final MetastoreTable ownerTable;
+
+        MsTopicOwnershipManagerImpl(MetastoreTable ownerTable) {
+            this.ownerTable = ownerTable;
+        }
+
+        @Override
+        public void close() throws IOException {
+            // do nothing
+        }
+
+        @Override
+        public void readOwnerInfo(final ByteString topic, final Callback<Versioned<HubInfo>> callback, Object ctx) {
+            ownerTable.get(topic.toStringUtf8(), new MetastoreCallback<Versioned<Value>>() {
+                @Override
+                public void complete(int rc, Versioned<Value> value, Object ctx) {
+                    if (MSException.Code.NoKey.getCode() == rc) {
+                        callback.operationFinished(ctx, null);
+                        return;
+                    }
+
+                    if (MSException.Code.OK.getCode() != rc) {
+                        logErrorAndFinishOperation("Could not read ownership for topic " + topic.toStringUtf8(),
+                                callback, ctx, rc);
+                        return;
+                    }
+
+                    HubInfo owner = null;
+                    try {
+                        byte[] data = value.getValue().getField(OWNER_FIELD);
+                        if (data != null) {
+                            owner = HubInfo.parse(new String(data));
+                        }
+                    } catch (HubInfo.InvalidHubInfoException ihie) {
+                        logger.warn("Failed to parse hub info for topic " + topic.toStringUtf8(), ihie);
+                    }
+                    Version version = value.getVersion();
+                    callback.operationFinished(ctx, new Versioned<HubInfo>(owner, version));
+                }
+            }, ctx);
+        }
+
+        @Override
+        public void writeOwnerInfo(final ByteString topic, final HubInfo owner, final Version version,
+                final Callback<Version> callback, Object ctx) {
+            Value value = new Value();
+            value.setField(OWNER_FIELD, owner.toString().getBytes());
+
+            ownerTable.put(topic.toStringUtf8(), value, version, new MetastoreCallback<Version>() {
+                @Override
+                public void complete(int rc, Version ver, Object ctx) {
+                    if (MSException.Code.OK.getCode() == rc) {
+                        callback.operationFinished(ctx, ver);
+                        return;
+                    } else if (MSException.Code.NoKey.getCode() == rc) {
+                        // no node
+                        callback.operationFailed(
+                                ctx,
+                                PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO, "No owner info found for topic "
+                                        + topic.toStringUtf8()));
+                        return;
+                    } else if (MSException.Code.KeyExists.getCode() == rc) {
+                        // key exists
+                        callback.operationFailed(
+                                ctx,
+                                PubSubException.create(StatusCode.TOPIC_OWNER_INFO_EXISTS, "Owner info of topic "
+                                        + topic.toStringUtf8() + " existed."));
+                        return;
+                    } else if (MSException.Code.BadVersion.getCode() == rc) {
+                        // bad version
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
+                                "Bad version provided to update owner info of topic " + topic.toStringUtf8()));
+                        return;
+                    } else {
+                        logErrorAndFinishOperation("Failed to update ownership of topic " + topic.toStringUtf8()
+                                + " to " + owner, callback, ctx, rc);
+                        return;
+                    }
+                }
+            }, ctx);
+        }
+
+        @Override
+        public void deleteOwnerInfo(final ByteString topic, Version version, final Callback<Void> callback,
+                Object ctx) {
+            ownerTable.remove(topic.toStringUtf8(), version, new MetastoreCallback<Void>() {
+                @Override
+                public void complete(int rc, Void value, Object ctx) {
+                    if (MSException.Code.OK.getCode() == rc) {
+                        logger.debug("Successfully deleted owner info for topic {}", topic.toStringUtf8());
+                        callback.operationFinished(ctx, null);
+                        return;
+                    } else if (MSException.Code.NoKey.getCode() == rc) {
+                        // no node
+                        callback.operationFailed(
+                                ctx,
+                                PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO, "No owner info found for topic "
+                                        + topic.toStringUtf8()));
+                        return;
+                    } else if (MSException.Code.BadVersion.getCode() == rc) {
+                        // bad version
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
+                                "Bad version provided to delete owner info of topic " + topic.toStringUtf8()));
+                        return;
+                    } else {
+                        logErrorAndFinishOperation("Failed to delete owner info for topic " + topic.toStringUtf8(),
+                                callback, ctx, rc);
+                        return;
+                    }
+                }
+            }, ctx);
+        }
+    }
+
+    @Override
+    public TopicPersistenceManager newTopicPersistenceManager() {
+        return new MsTopicPersistenceManagerImpl(persistTable);
+    }
+
+    static class MsTopicPersistenceManagerImpl implements TopicPersistenceManager {
+
+        static final String PERSIST_FIELD = "prst";
+
+        final MetastoreTable persistTable;
+
+        MsTopicPersistenceManagerImpl(MetastoreTable persistTable) {
+            this.persistTable = persistTable;
+        }
+
+        @Override
+        public void close() throws IOException {
+            // do nothing
+        }
+
+        @Override
+        public void readTopicPersistenceInfo(final ByteString topic, final Callback<Versioned<LedgerRanges>> callback,
+                Object ctx) {
+            persistTable.get(topic.toStringUtf8(), new MetastoreCallback<Versioned<Value>>() {
+                @Override
+                public void complete(int rc, Versioned<Value> value, Object ctx) {
+                    if (MSException.Code.OK.getCode() == rc) {
+                        byte[] data = value.getValue().getField(PERSIST_FIELD);
+                        if (data != null) {
+                            parseAndReturnTopicLedgerRanges(topic, data, value.getVersion(), callback, ctx);
+                        } else { // null data is same as NoKey
+                            callback.operationFinished(ctx, null);
+                        }
+                    } else if (MSException.Code.NoKey.getCode() == rc) {
+                        callback.operationFinished(ctx, null);
+                    } else {
+                        logErrorAndFinishOperation("Could not read ledgers node for topic " + topic.toStringUtf8(),
+                                callback, ctx, rc);
+                    }
+                }
+            }, ctx);
+        }
+
+        /**
+         * Parse ledger ranges data and return it thru callback.
+         *
+         * @param topic
+         *            Topic name
+         * @param data
+         *            Topic Ledger Ranges data
+         * @param version
+         *            Version of the topic ledger ranges data
+         * @param callback
+         *            Callback to return ledger ranges
+         * @param ctx
+         *            Context of the callback
+         */
+        private void parseAndReturnTopicLedgerRanges(ByteString topic, byte[] data, Version version,
+                Callback<Versioned<LedgerRanges>> callback, Object ctx) {
+            try {
+                LedgerRanges.Builder rangesBuilder = LedgerRanges.newBuilder();
+                TextFormat.merge(new String(data, UTF8), rangesBuilder);
+                LedgerRanges lr = rangesBuilder.build();
+                Versioned<LedgerRanges> ranges = new Versioned<LedgerRanges>(lr, version);
+                callback.operationFinished(ctx, ranges);
+            } catch (ParseException e) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("Ledger ranges for topic ").append(topic.toStringUtf8())
+                        .append(" could not be deserialized.");
+                String msg = sb.toString();
+                logger.error(msg, e);
+                callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+            } catch (UnsupportedEncodingException uee) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("Ledger ranges for topic ").append(topic.toStringUtf8()).append(" is not UTF-8 encoded.");
+                String msg = sb.toString();
+                logger.error(msg, uee);
+                callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+            }
+        }
+
+        @Override
+        public void writeTopicPersistenceInfo(final ByteString topic, LedgerRanges ranges, final Version version,
+                final Callback<Version> callback, Object ctx) {
+            Value value = new Value();
+            value.setField(PERSIST_FIELD, TextFormat.printToString(ranges).getBytes());
+
+            persistTable.put(topic.toStringUtf8(), value, version, new MetastoreCallback<Version>() {
+                @Override
+                public void complete(int rc, Version ver, Object ctx) {
+                    if (MSException.Code.OK.getCode() == rc) {
+                        callback.operationFinished(ctx, ver);
+                        return;
+                    } else if (MSException.Code.NoKey.getCode() == rc) {
+                        // no node
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO,
+                                "No persistence info found for topic " + topic.toStringUtf8()));
+                        return;
+                    } else if (MSException.Code.KeyExists.getCode() == rc) {
+                        // key exists
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS,
+                                "Persistence info of topic " + topic.toStringUtf8() + " existed."));
+                        return;
+                    } else if (MSException.Code.BadVersion.getCode() == rc) {
+                        // bad version
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
+                                "Bad version provided to update persistence info of topic " + topic.toStringUtf8()));
+                        return;
+                    } else {
+                        logErrorAndFinishOperation("Could not write ledgers node for topic " + topic.toStringUtf8(),
+                                callback, ctx, rc);
+                    }
+                }
+            }, ctx);
+        }
+
+        @Override
+        public void deleteTopicPersistenceInfo(final ByteString topic, final Version version,
+                final Callback<Void> callback, Object ctx) {
+            persistTable.remove(topic.toStringUtf8(), version, new MetastoreCallback<Void>() {
+                @Override
+                public void complete(int rc, Void value, Object ctx) {
+                    if (MSException.Code.OK.getCode() == rc) {
+                        logger.debug("Successfully deleted persistence info for topic {}.", topic.toStringUtf8());
+                        callback.operationFinished(ctx, null);
+                        return;
+                    } else if (MSException.Code.NoKey.getCode() == rc) {
+                        // no node
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO,
+                                "No persistence info found for topic " + topic.toStringUtf8()));
+                        return;
+                    } else if (MSException.Code.BadVersion.getCode() == rc) {
+                        // bad version
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
+                                "Bad version provided to delete persistence info of topic " + topic.toStringUtf8()));
+                        return;
+                    } else {
+                        logErrorAndFinishOperation("Failed to delete persistence info topic: " + topic.toStringUtf8()
+                                + ", version: " + version, callback, ctx, rc, StatusCode.SERVICE_DOWN);
+                        return;
+                    }
+                }
+            }, ctx);
+        }
+    }
+
+    @Override
+    public SubscriptionDataManager newSubscriptionDataManager() {
+        return new MsSubscriptionDataManagerImpl(cfg, subTable);
+    }
+
+    static class MsSubscriptionDataManagerImpl implements SubscriptionDataManager {
+
+        static final String SUB_STATE_FIELD = "sub_state";
+        static final String SUB_PREFS_FIELD = "sub_preferences";
+
+        static final char TOPIC_SUB_FIRST_SEPARATOR = '\001';
+        static final char TOPIC_SUB_LAST_SEPARATOR = '\002';
+
+        final ServerConfiguration cfg;
+        final MetastoreScannableTable subTable;
+
+        MsSubscriptionDataManagerImpl(ServerConfiguration cfg, MetastoreScannableTable subTable) {
+            this.cfg = cfg;
+            this.subTable = subTable;
+        }
+
+        @Override
+        public void close() throws IOException {
+            // do nothing
+        }
+
+        private String getSubscriptionKey(ByteString topic, ByteString subscriberId) {
+            return new StringBuilder(topic.toStringUtf8()).append(TOPIC_SUB_FIRST_SEPARATOR)
+                    .append(subscriberId.toStringUtf8()).toString();
+        }
+
+        private Value subscriptionData2Value(SubscriptionData subData) {
+            Value value = new Value();
+            if (subData.hasState()) {
+                value.setField(SUB_STATE_FIELD, TextFormat.printToString(subData.getState()).getBytes());
+            }
+            if (subData.hasPreferences()) {
+                value.setField(SUB_PREFS_FIELD, TextFormat.printToString(subData.getPreferences()).getBytes());
+            }
+            return value;
+        }
+
+        @Override
+        public void createSubscriptionData(final ByteString topic, final ByteString subscriberId,
+                final SubscriptionData subData, final Callback<Version> callback, Object ctx) {
+            String key = getSubscriptionKey(topic, subscriberId);
+            Value value = subscriptionData2Value(subData);
+
+            subTable.put(key, value, Version.NEW, new MetastoreCallback<Version>() {
+                @Override
+                public void complete(int rc, Version ver, Object ctx) {
+                    if (rc == MSException.Code.OK.getCode()) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Successfully create subscription for topic: " + topic.toStringUtf8()
+                                    + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
+                                    + SubscriptionStateUtils.toString(subData));
+                        }
+                        callback.operationFinished(ctx, ver);
+                    } else if (rc == MSException.Code.KeyExists.getCode()) {
+                        callback.operationFailed(ctx, PubSubException.create(
+                                StatusCode.SUBSCRIPTION_STATE_EXISTS,
+                                "Subscription data for (topic:" + topic.toStringUtf8() + ", subscriber:"
+                                        + subscriberId.toStringUtf8() + ") existed."));
+                        return;
+                    } else {
+                        logErrorAndFinishOperation("Failed to create topic: " + topic.toStringUtf8()
+                                + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
+                                + SubscriptionStateUtils.toString(subData), callback, ctx, rc);
+                    }
+                }
+            }, ctx);
+        }
+
+        @Override
+        public boolean isPartialUpdateSupported() {
+            // TODO: Here we assume Metastore support partial update, but this
+            // maybe incorrect.
+            return true;
+        }
+
+        @Override
+        public void replaceSubscriptionData(final ByteString topic, final ByteString subscriberId,
+                final SubscriptionData subData, final Version version, final Callback<Version> callback,
+                final Object ctx) {
+            updateSubscriptionData(topic, subscriberId, subData, version, callback, ctx);
+        }
+
+        @Override
+        public void updateSubscriptionData(final ByteString topic, final ByteString subscriberId,
+                final SubscriptionData subData, final Version version, final Callback<Version> callback,
+                final Object ctx) {
+            String key = getSubscriptionKey(topic, subscriberId);
+            Value value = subscriptionData2Value(subData);
+
+            subTable.put(key, value, version, new MetastoreCallback<Version>() {
+                @Override
+                public void complete(int rc, Version version, Object ctx) {
+                    if (rc == MSException.Code.OK.getCode()) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Successfully updated subscription data for topic: " + topic.toStringUtf8()
+                                    + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
+                                    + SubscriptionStateUtils.toString(subData) + ", version: " + version);
+                        }
+                        callback.operationFinished(ctx, version);
+                    } else if (rc == MSException.Code.NoKey.getCode()) {
+                        // no node
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE,
+                                "No subscription data found for (topic:" + topic.toStringUtf8() + ", subscriber:"
+                                        + subscriberId.toStringUtf8() + ")."));
+                        return;
+                    } else if (rc == MSException.Code.BadVersion.getCode()) {
+                        // bad version
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
+                                "Bad version provided to update subscription data of topic " + topic.toStringUtf8()
+                                        + " subscriberId " + subscriberId));
+                        return;
+                    } else {
+                        logErrorAndFinishOperation(
+                                "Failed to update subscription data for topic: " + topic.toStringUtf8()
+                                        + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
+                                        + SubscriptionStateUtils.toString(subData) + ", version: " + version, callback,
+                                ctx, rc);
+                    }
+                }
+            }, ctx);
+        }
+
+        @Override
+        public void deleteSubscriptionData(final ByteString topic, final ByteString subscriberId, Version version,
+                final Callback<Void> callback, Object ctx) {
+            String key = getSubscriptionKey(topic, subscriberId);
+            subTable.remove(key, version, new MetastoreCallback<Void>() {
+                @Override
+                public void complete(int rc, Void value, Object ctx) {
+                    if (rc == MSException.Code.OK.getCode()) {
+                        logger.debug("Successfully delete subscription for topic: {}, subscriberId: {}.",
+                                topic.toStringUtf8(), subscriberId.toStringUtf8());
+                        callback.operationFinished(ctx, null);
+                        return;
+                    } else if (rc == MSException.Code.BadVersion.getCode()) {
+                        // bad version
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
+                                "Bad version provided to delete subscriptoin data of topic " + topic.toStringUtf8()
+                                        + " subscriberId " + subscriberId));
+                        return;
+                    } else if (rc == MSException.Code.NoKey.getCode()) {
+                        // no node
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE,
+                                "No subscription data found for (topic:" + topic.toStringUtf8() + ", subscriber:"
+                                        + subscriberId.toStringUtf8() + ")."));
+                        return;
+                    } else {
+                        logErrorAndFinishOperation("Failed to delete subscription topic: " + topic.toStringUtf8()
+                                + ", subscriberId: " + subscriberId.toStringUtf8(), callback, ctx, rc,
+                                StatusCode.SERVICE_DOWN);
+                    }
+                }
+            }, ctx);
+        }
+
+        private SubscriptionData value2SubscriptionData(Value value) throws ParseException,
+                UnsupportedEncodingException {
+            SubscriptionData.Builder builder = SubscriptionData.newBuilder();
+
+            byte[] stateData = value.getField(SUB_STATE_FIELD);
+            if (null != stateData) {
+                SubscriptionState.Builder stateBuilder = SubscriptionState.newBuilder();
+                TextFormat.merge(new String(stateData, UTF8), stateBuilder);
+                SubscriptionState state = stateBuilder.build();
+                builder.setState(state);
+            }
+
+            byte[] prefsData = value.getField(SUB_PREFS_FIELD);
+            if (null != prefsData) {
+                SubscriptionPreferences.Builder preferencesBuilder = SubscriptionPreferences.newBuilder();
+                TextFormat.merge(new String(prefsData, UTF8), preferencesBuilder);
+                SubscriptionPreferences preferences = preferencesBuilder.build();
+                builder.setPreferences(preferences);
+            }
+
+            return builder.build();
+        }
+
+        @Override
+        public void readSubscriptionData(final ByteString topic, final ByteString subscriberId,
+                final Callback<Versioned<SubscriptionData>> callback, Object ctx) {
+            String key = getSubscriptionKey(topic, subscriberId);
+            subTable.get(key, new MetastoreCallback<Versioned<Value>>() {
+                @Override
+                public void complete(int rc, Versioned<Value> value, Object ctx) {
+                    if (rc == MSException.Code.NoKey.getCode()) {
+                        callback.operationFinished(ctx, null);
+                        return;
+                    }
+
+                    if (rc != MSException.Code.OK.getCode()) {
+                        logErrorAndFinishOperation(
+                                "Could not read subscription data for topic: " + topic.toStringUtf8()
+                                        + ", subscriberId: " + subscriberId.toStringUtf8(), callback, ctx, rc);
+                        return;
+                    }
+
+                    try {
+                        Versioned<SubscriptionData> subData = new Versioned<SubscriptionData>(
+                                value2SubscriptionData(value.getValue()), value.getVersion());
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8()
+                                    + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
+                                    + SubscriptionStateUtils.toString(subData.getValue()) + ", version: "
+                                    + subData.getVersion());
+                        }
+                        callback.operationFinished(ctx, subData);
+                    } catch (ParseException e) {
+                        StringBuilder sb = new StringBuilder();
+                        sb.append("Failed to deserialize subscription data for topic:").append(topic.toStringUtf8())
+                                .append(", subscriberId: ").append(subscriberId.toStringUtf8());
+                        String msg = sb.toString();
+                        logger.error(msg, e);
+                        callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+                    } catch (UnsupportedEncodingException uee) {
+                        StringBuilder sb = new StringBuilder();
+                        sb.append("Subscription data for topic: ").append(topic.toStringUtf8())
+                                .append(", subscriberId: ").append(subscriberId.toStringUtf8())
+                                .append(" is not UFT-8 encoded");
+                        String msg = sb.toString();
+                        logger.error(msg, uee);
+                        callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+                    }
+                }
+            }, ctx);
+        }
+
+        private String getSubscriptionPrefix(ByteString topic, char sep) {
+            return new StringBuilder(topic.toStringUtf8()).append(sep).toString();
+        }
+
+        private void readSubscriptions(final ByteString topic, final int keyLength, final MetastoreCursor cursor,
+                final Map<ByteString, Versioned<SubscriptionData>> topicSubs,
+                final Callback<Map<ByteString, Versioned<SubscriptionData>>> callback, Object ctx) {
+            if (!cursor.hasMoreEntries()) {
+                callback.operationFinished(ctx, topicSubs);
+                return;
+            }
+            ReadEntriesCallback readCb = new ReadEntriesCallback() {
+                @Override
+                public void complete(int rc, Iterator<MetastoreTableItem> items, Object ctx) {
+                    if (rc != MSException.Code.OK.getCode()) {
+                        logErrorAndFinishOperation("Could not read subscribers for cursor " + cursor,
+                                callback, ctx, rc);
+                        return;
+                    }
+                    while (items.hasNext()) {
+                        MetastoreTableItem item = items.next();
+                        final ByteString subscriberId = ByteString.copyFromUtf8(item.getKey().substring(keyLength));
+                        try {
+                            Versioned<Value> vv = item.getValue();
+                            Versioned<SubscriptionData> subData = new Versioned<SubscriptionData>(
+                                    value2SubscriptionData(vv.getValue()), vv.getVersion());
+                            topicSubs.put(subscriberId, subData);
+                        } catch (ParseException e) {
+                            StringBuilder sb = new StringBuilder();
+                            sb.append("Failed to deserialize subscription data for topic: ")
+                                    .append(topic.toStringUtf8()).append(", subscriberId: ")
+                                    .append(subscriberId.toStringUtf8());
+                            String msg = sb.toString();
+                            logger.error(msg, e);
+                            callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+                            return;
+                        } catch (UnsupportedEncodingException e) {
+                            StringBuilder sb = new StringBuilder();
+                            sb.append("Subscription data for topic: ").append(topic.toStringUtf8())
+                                    .append(", subscriberId: ").append(subscriberId.toStringUtf8())
+                                    .append(" is not UTF-8 encoded.");
+                            String msg = sb.toString();
+                            logger.error(msg, e);
+                            callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+                            return;
+                        }
+                    }
+                    readSubscriptions(topic, keyLength, cursor, topicSubs, callback, ctx);
+                }
+            };
+            cursor.asyncReadEntries(cfg.getMetastoreMaxEntriesPerScan(), readCb, ctx);
+        }
+
+        @Override
+        public void readSubscriptions(final ByteString topic,
+                final Callback<Map<ByteString, Versioned<SubscriptionData>>> callback, Object ctx) {
+            final String firstKey = getSubscriptionPrefix(topic, TOPIC_SUB_FIRST_SEPARATOR);
+            String lastKey = getSubscriptionPrefix(topic, TOPIC_SUB_LAST_SEPARATOR);
+            subTable.openCursor(firstKey, true, lastKey, true, Order.ASC, ALL_FIELDS,
+                    new MetastoreCallback<MetastoreCursor>() {
+                        @Override
+                        public void complete(int rc, MetastoreCursor cursor, Object ctx) {
+                            if (rc != MSException.Code.OK.getCode()) {
+                                logErrorAndFinishOperation(
+                                        "Could not read subscribers for topic " + topic.toStringUtf8(), callback, ctx,
+                                        rc);
+                                return;
+                            }
+
+                            final Map<ByteString, Versioned<SubscriptionData>> topicSubs =
+                                    new ConcurrentHashMap<ByteString, Versioned<SubscriptionData>>();
+                            readSubscriptions(topic, firstKey.length(), cursor, topicSubs, callback, ctx);
+                        }
+                    }, ctx);
+        }
+    }
+
+    /**
+     * callback finish operation with exception specify by code, regardless of
+     * the value of return code rc.
+     */
+    private static <T> void logErrorAndFinishOperation(String msg, Callback<T> callback, Object ctx, int rc,
+            StatusCode code) {
+        logger.error(msg, MSException.create(MSException.Code.get(rc), ""));
+        callback.operationFailed(ctx, PubSubException.create(code, msg));
+    }
+
+    /**
+     * callback finish operation with corresponding PubSubException converted
+     * from return code rc.
+     */
+    private static <T> void logErrorAndFinishOperation(String msg, Callback<T> callback, Object ctx, int rc) {
+        StatusCode code;
+
+        if (rc == MSException.Code.NoKey.getCode()) {
+            code = StatusCode.NO_SUCH_TOPIC;
+        } else if (rc == MSException.Code.ServiceDown.getCode()) {
+            code = StatusCode.SERVICE_DOWN;
+        } else {
+            code = StatusCode.UNEXPECTED_CONDITION;
+        }
+
+        logErrorAndFinishOperation(msg, callback, ctx, rc, code);
+    }
+
+    @Override
+    public void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException {
+        try {
+            int maxEntriesPerScan = cfg.getMetastoreMaxEntriesPerScan();
+
+            // clean topic ownership table.
+            logger.info("Cleaning topic ownership table ...");
+            MetastoreUtils.cleanTable(ownerTable, maxEntriesPerScan);
+            logger.info("Cleaned topic ownership table successfully.");
+
+            // clean topic subscription table.
+            logger.info("Cleaning topic subscription table ...");
+            MetastoreUtils.cleanTable(subTable, maxEntriesPerScan);
+            logger.info("Cleaned topic subscription table successfully.");
+
+            // clean topic persistence info table.
+            logger.info("Cleaning topic persistence info table ...");
+            MetastoreUtils.cleanTable(persistTable, maxEntriesPerScan);
+            logger.info("Cleaned topic persistence info table successfully.");
+        } catch (MSException mse) {
+            throw new IOException("Exception when formatting hedwig metastore : ", mse);
+        } catch (InterruptedException ie) {
+            throw new IOException("Interrupted when formatting hedwig metastore : ", ie);
+        }
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java?rev=1421384&r1=1421383&r2=1421384&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java Thu Dec 13 17:05:57 2012
@@ -23,6 +23,7 @@ package org.apache.hedwig.server.meta;
 import java.util.Arrays;
 import java.util.Collection;
 
+import org.apache.bookkeeper.metastore.InMemoryMetaStore;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.meta.MetadataManagerFactory;
 import org.apache.hedwig.server.meta.ZkMetadataManagerFactory;
@@ -49,12 +50,15 @@ public abstract class MetadataManagerFac
         super();
         conf = new ServerConfiguration();
         conf.setMetadataManagerFactoryName(metadataManagerFactoryCls);
+        conf.getConf().setProperty("metastore_impl_class", InMemoryMetaStore.class.getName());
+        InMemoryMetaStore.reset();
     }
 
     @Parameters
     public static Collection<Object[]> configs() {
         return Arrays.asList(new Object[][] {
-            { ZkMetadataManagerFactory.class.getName() }
+            { ZkMetadataManagerFactory.class.getName() },
+            { MsMetadataManagerFactory.class.getName() },
         });
     }