You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/07/16 02:35:51 UTC

[GitHub] sijie closed pull request #1114: (WIP) BP-28 (prototype): use Etcd as metadata store

sijie closed pull request #1114: (WIP) BP-28 (prototype): use Etcd as metadata store
URL: https://github.com/apache/bookkeeper/pull/1114
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/metadata-stores/etcd/pom.xml b/metadata-stores/etcd/pom.xml
new file mode 100644
index 000000000..8d461e203
--- /dev/null
+++ b/metadata-stores/etcd/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.bookkeeper.metadata</groupId>
+        <artifactId>metadata-stores-parent</artifactId>
+        <version>4.7.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.bookkeeper.metadata</groupId>
+    <artifactId>metadata-stores-etcd</artifactId>
+    <name>Apache BookKeeper :: Metadata Stores :: Etcd</name>
+    <url>http://maven.apache.org</url>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>${maven-checkstyle-plugin.version}</version>
+                <dependencies>
+                    <dependency>
+                        <groupId>com.puppycrawl.tools</groupId>
+                        <artifactId>checkstyle</artifactId>
+                        <version>${puppycrawl.checkstyle.version}</version>
+                    </dependency>
+                    <dependency>
+                        <groupId>org.apache.bookkeeper</groupId>
+                        <artifactId>buildtools</artifactId>
+                        <version>${project.parent.version}</version>
+                    </dependency>
+                </dependencies>
+                <configuration>
+                    <configLocation>bookkeeper/checkstyle.xml</configLocation>
+                    <suppressionsLocation>bookkeeper/suppressions.xml</suppressionsLocation>
+                    <encoding>UTF-8</encoding>
+                    <consoleOutput>true</consoleOutput>
+                    <failsOnError>true</failsOnError>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>checkstyle</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.bookkeeper</groupId>
+            <artifactId>bookkeeper-server</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.coreos</groupId>
+            <artifactId>jetcd-core</artifactId>
+            <version>0.0.1</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdIdGenerator.java b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdIdGenerator.java
new file mode 100644
index 000000000..719cec9b1
--- /dev/null
+++ b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdIdGenerator.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.metadata.etcd;
+
+import static org.apache.bookkeeper.metadata.etcd.EtcdUtils.getIdGenKey;
+
+import com.coreos.jetcd.KV;
+import com.coreos.jetcd.Txn;
+import com.coreos.jetcd.data.ByteSequence;
+import com.coreos.jetcd.data.KeyValue;
+import com.coreos.jetcd.kv.GetResponse;
+import com.coreos.jetcd.op.Cmp;
+import com.coreos.jetcd.op.Cmp.Op;
+import com.coreos.jetcd.op.CmpTarget;
+import com.coreos.jetcd.options.GetOption;
+import com.coreos.jetcd.options.PutOption;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.meta.LedgerIdGenerator;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+
+/**
+ * Etcd id generator.
+ */
+class EtcdIdGenerator implements LedgerIdGenerator {
+
+    private final String scope;
+    private final KV kvClient;
+    private long nextId = -1L;
+    private long idKeyVersion = -1L;
+    private CompletableFuture<Void> fetchKeyFuture = null;
+    private ByteSequence idGenKey;
+
+    EtcdIdGenerator(KV kvClient, String scope) {
+       this.kvClient = kvClient;
+       this.scope = scope;
+       this.idGenKey = ByteSequence.fromString(getIdGenKey(scope));
+    }
+
+    @Override
+    public void generateLedgerId(GenericCallback<Long> cb) {
+        fetchIdKeyIfNecessary()
+            .thenRun(() -> executeGenerateLedgerIdTxn(cb))
+            .exceptionally(cause -> {
+                cb.operationComplete(Code.MetaStoreException, null);
+                return null;
+            });
+    }
+
+    private void executeGenerateLedgerIdTxn(GenericCallback<Long> cb) {
+        Txn txn;
+        final long idToAllocate;
+        final long idAllocatedAtVersion;
+        synchronized (this) {
+            Cmp keyCompare;
+            if (idKeyVersion < 0L) {
+                // key doesn't exist
+                keyCompare = new Cmp(
+                    idGenKey,
+                    Op.EQUAL,
+                    CmpTarget.value(ByteSequence.fromBytes(new byte[0])));
+            } else {
+                keyCompare = new Cmp(
+                    idGenKey,
+                    Op.EQUAL,
+                    CmpTarget.version(idKeyVersion));
+            }
+            txn = kvClient.txn()
+                .If(keyCompare)
+                .Then(com.coreos.jetcd.op.Op.put(
+                    idGenKey,
+                    ByteSequence.fromBytes(EtcdUtils.toBytes(nextId)),
+                    PutOption.DEFAULT))
+                .Else(com.coreos.jetcd.op.Op.get(
+                    idGenKey,
+                    GetOption.DEFAULT));
+            idToAllocate = nextId;
+            idAllocatedAtVersion = idKeyVersion;
+        }
+        txn.commit()
+            .thenAccept(resp -> {
+                if (resp.isSucceeded()) {
+                    // the creation succeed
+                    synchronized (this) {
+                        if (nextId == idToAllocate) {
+                            nextId = idToAllocate + 1;
+                        }
+                        if (idKeyVersion == idAllocatedAtVersion) {
+                            idKeyVersion = idAllocatedAtVersion + 1;
+                        }
+                    }
+                    cb.operationComplete(Code.OK, idToAllocate);
+                } else {
+                    // the creation failed
+                    GetResponse getResp = resp.getGetResponses().get(0);
+                    if (getResp.getCount() <= 0) {
+                        // key was deleted somehow
+                        cb.operationComplete(Code.UnexpectedConditionException, null);
+                    } else {
+                        KeyValue kv = getResp.getKvs().get(0);
+                        synchronized (this) {
+                            idKeyVersion = kv.getVersion();
+                            nextId = EtcdUtils.toLong(kv.getValue().getBytes(), 0) + 1;
+                        }
+                        // try again
+                        executeGenerateLedgerIdTxn(cb);
+                    }
+                }
+            })
+            .exceptionally(cause -> {
+                cb.operationComplete(Code.MetaStoreException, null);
+                return null;
+            });
+    }
+
+    private CompletableFuture<Void> fetchIdKeyIfNecessary() {
+        CompletableFuture<Void> keyFuture;
+        synchronized (this) {
+            if (null != fetchKeyFuture) {
+                keyFuture = fetchKeyFuture;
+            } else {
+                keyFuture = fetchKeyFuture = FutureUtils.createFuture();
+            }
+        }
+        if (null != keyFuture) {
+            return keyFuture;
+        } else {
+            return kvClient.get(
+                ByteSequence.fromString(getIdGenKey(scope)),
+                GetOption.DEFAULT
+            ).whenComplete(new FutureEventListener<GetResponse>() {
+                @Override
+                public void onSuccess(GetResponse response) {
+                    synchronized (EtcdIdGenerator.this) {
+                        if (response.getCount() == 0) {
+                            // no key is found.
+                            nextId = 0L;
+                            idKeyVersion = -1L;
+                        } else {
+                            KeyValue kv = response.getKvs().get(0);
+                            idKeyVersion = kv.getVersion();
+                            long curId = EtcdUtils.toLong(
+                                kv.getValue().getBytes(),
+                                0);
+                            nextId = curId + 1;
+                        }
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable cause) {
+                    keyFuture.completeExceptionally(cause);
+                    synchronized (EtcdIdGenerator.this) {
+                        fetchKeyFuture = null;
+                    }
+                }
+            }).thenApply(resp -> null);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}
diff --git a/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLayoutManager.java b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLayoutManager.java
new file mode 100644
index 000000000..8aeb19fbb
--- /dev/null
+++ b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLayoutManager.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.metadata.etcd;
+
+import static org.apache.bookkeeper.metadata.etcd.EtcdUtils.ioResult;
+
+import com.coreos.jetcd.KV;
+import com.coreos.jetcd.Lease;
+import com.coreos.jetcd.data.ByteSequence;
+import com.coreos.jetcd.kv.GetResponse;
+import com.coreos.jetcd.options.GetOption;
+import java.io.IOException;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.bookkeeper.meta.LayoutManager;
+import org.apache.bookkeeper.meta.LedgerLayout;
+
+/**
+ * Etcd based layout manager.
+ */
+@Getter(AccessLevel.PACKAGE)
+class EtcdLayoutManager implements LayoutManager {
+
+    private final KV kvClient;
+    private final Lease leaseClient;
+    private final String scope;
+    private final ByteSequence layoutKey;
+
+    EtcdLayoutManager(KV kvClient,
+                      Lease leaseClient,
+                      String scope) {
+        this.kvClient = kvClient;
+        this.leaseClient = leaseClient;
+        this.scope = scope;
+        this.layoutKey = ByteSequence.fromString(
+            EtcdUtils.getLayoutKey(scope));
+    }
+
+    @Override
+    public LedgerLayout readLedgerLayout() throws IOException {
+        GetResponse response = ioResult(kvClient.get(layoutKey, GetOption.DEFAULT));
+        if (response.getCount() <= 0) {
+            return null;
+        } else {
+            byte[] layoutData = response.getKvs().get(0).getValue().getBytes();
+            return LedgerLayout.parseLayout(layoutData);
+        }
+    }
+
+    @Override
+    public void storeLedgerLayout(LedgerLayout layout) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deleteLedgerLayout() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
new file mode 100644
index 000000000..7ec1d267e
--- /dev/null
+++ b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.metadata.etcd;
+
+import com.coreos.jetcd.KV;
+import com.coreos.jetcd.data.ByteSequence;
+import com.coreos.jetcd.kv.GetResponse;
+import com.coreos.jetcd.op.Cmp;
+import com.coreos.jetcd.op.Cmp.Op;
+import com.coreos.jetcd.op.CmpTarget;
+import com.coreos.jetcd.options.GetOption;
+import com.coreos.jetcd.options.PutOption;
+import java.io.IOException;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+
+/**
+ * Etcd ledger manager.
+ */
+class EtcdLedgerManager implements LedgerManager {
+
+    private final String scope;
+    private final KV kvClient;
+
+    EtcdLedgerManager(KV kvClient, String scope) {
+        this.kvClient = kvClient;
+        this.scope = scope;
+    }
+
+    @Override
+    public void createLedgerMetadata(long ledgerId,
+                                     LedgerMetadata metadata,
+                                     GenericCallback<Void> cb) {
+        String ledgerKey = EtcdUtils.getLedgerKey(scope, ledgerId);
+        kvClient.txn()
+            .If(new Cmp(
+                ByteSequence.fromString(ledgerKey),
+                Op.EQUAL,
+                CmpTarget.value(ByteSequence.fromBytes(new byte[0]))))
+            .Then(com.coreos.jetcd.op.Op.put(
+                ByteSequence.fromString(ledgerKey),
+                ByteSequence.fromBytes(metadata.serialize()),
+                PutOption.DEFAULT))
+            .Else(com.coreos.jetcd.op.Op.get(
+                ByteSequence.fromString(ledgerKey),
+                GetOption.DEFAULT))
+            .commit()
+            .thenAccept(resp -> {
+                if (resp.isSucceeded()) {
+                    metadata.setVersion(new LongVersion(0L));
+                    cb.operationComplete(Code.OK, null);
+                } else {
+                    GetResponse getResp = resp.getGetResponses().get(0);
+                    if (getResp.getCount() <= 0) {
+                        // key doesn't exist but we fail to put the key
+                        cb.operationComplete(Code.UnexpectedConditionException, null);
+                    } else {
+                        // key exists
+                        cb.operationComplete(Code.LedgerExistException, null);
+                    }
+                }
+            })
+            .exceptionally(cause -> {
+                cb.operationComplete(Code.MetaStoreException, null);
+                return null;
+            });
+    }
+
+    @Override
+    public void removeLedgerMetadata(long ledgerId, Version version, GenericCallback<Void> cb) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb) {
+        Version v = metadata.getVersion();
+        if (Version.NEW == v || !(v instanceof LongVersion)) {
+            cb.operationComplete(Code.MetadataVersionException, null);
+            return;
+        }
+        final LongVersion lv = (LongVersion) v;
+
+        String ledgerKey = EtcdUtils.getLedgerKey(scope, ledgerId);
+        kvClient.txn()
+            .If(new Cmp(
+                ByteSequence.fromString(ledgerKey),
+                Op.EQUAL,
+                CmpTarget.version(lv.getLongVersion())))
+            .Then(com.coreos.jetcd.op.Op.put(
+                ByteSequence.fromString(ledgerKey),
+                ByteSequence.fromBytes(metadata.serialize()),
+                PutOption.DEFAULT))
+            .commit()
+            .thenAccept(resp -> {
+                if (resp.isSucceeded()) {
+                    metadata.setVersion(new LongVersion(lv.getLongVersion() + 1));
+                    cb.operationComplete(Code.OK, null);
+                } else {
+                    cb.operationComplete(Code.MetadataVersionException, null);
+                }
+            })
+            .exceptionally(cause -> {
+                cb.operationComplete(Code.MetaStoreException, null);
+                return null;
+            });
+    }
+
+    @Override
+    public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void asyncProcessLedgers(Processor<Long> processor, VoidCallback finalCb, Object context, int successRc, int failureRc) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LedgerRangeIterator getLedgerRanges() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}
diff --git a/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
new file mode 100644
index 000000000..c96bf332f
--- /dev/null
+++ b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.metadata.etcd;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.coreos.jetcd.KV;
+import java.io.IOException;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.meta.LayoutManager;
+import org.apache.bookkeeper.meta.LedgerIdGenerator;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Etcd based ledger manager factory.
+ */
+public class EtcdLedgerManagerFactory extends LedgerManagerFactory {
+
+    private String scope;
+    private KV kvClient;
+
+    @Override
+    public int getCurrentVersion() {
+        return 0;
+    }
+
+    @Override
+    public LedgerManagerFactory initialize(AbstractConfiguration conf,
+                                           LayoutManager layoutManager,
+                                           int factoryVersion) throws IOException {
+        checkArgument(layoutManager instanceof EtcdLayoutManager);
+
+        EtcdLayoutManager etcdLayoutManager = (EtcdLayoutManager) layoutManager;
+
+        this.scope = conf.getZkLedgersRootPath();
+        this.kvClient = etcdLayoutManager.getKvClient();
+
+        return this;
+    }
+
+    @Override
+    public LedgerIdGenerator newLedgerIdGenerator() {
+        return new EtcdIdGenerator(kvClient, scope);
+    }
+
+    @Override
+    public LedgerManager newLedgerManager() {
+        return new EtcdLedgerManager(kvClient, scope);
+    }
+
+    @Override
+    public LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws KeeperException, InterruptedException, CompatibilityException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean validateAndNukeExistingCluster(AbstractConfiguration<?> conf, LayoutManager lm) throws InterruptedException, KeeperException, IOException {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdRegistrationClient.java b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdRegistrationClient.java
new file mode 100644
index 000000000..7b0b6cbd5
--- /dev/null
+++ b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdRegistrationClient.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.metadata.etcd;
+
+import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+
+import com.coreos.jetcd.Client;
+import com.coreos.jetcd.KV;
+import com.coreos.jetcd.Lease;
+import com.coreos.jetcd.Watch;
+import com.coreos.jetcd.Watch.Watcher;
+import com.coreos.jetcd.data.ByteSequence;
+import com.coreos.jetcd.options.GetOption;
+import com.coreos.jetcd.options.GetOption.SortTarget;
+import com.coreos.jetcd.options.WatchOption;
+import com.coreos.jetcd.watch.WatchEvent;
+import com.coreos.jetcd.watch.WatchResponse;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.meta.LayoutManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Etcd based registration client.
+ */
+public class EtcdRegistrationClient implements RegistrationClient {
+
+    private String scope;
+    private Client client;
+    private KV kvClient;
+    private Lease leaseClient;
+    private Watch watchClient;
+    private LayoutManager layoutManager;
+
+    // registration paths
+    private String bookieRegistrationPath;
+    private String bookieReadonlyRegistrationPath;
+
+    @Override
+    public RegistrationClient initialize(ClientConfiguration conf,
+                                         ScheduledExecutorService scheduler,
+                                         StatsLogger statsLogger,
+                                         Optional<ZooKeeper> zkOptional) throws BKException {
+        // initialize etcd kv and lease client
+        this.scope = conf.getZkLedgersRootPath();
+
+        this.bookieRegistrationPath = conf.getZkAvailableBookiesPath() + "/writable";
+        this.bookieReadonlyRegistrationPath = conf.getZkAvailableBookiesPath() + "/" + READONLY;
+
+        // TODO: initialize etcd
+        this.client = Client.builder()
+            .endpoints(conf.getZkServers()) // TODO: make a more general name
+            .build();
+        this.kvClient = client.getKVClient();
+        this.leaseClient = client.getLeaseClient();
+        this.watchClient = client.getWatchClient();
+
+        this.layoutManager = new EtcdLayoutManager(kvClient, leaseClient, scope);
+        return this;
+    }
+
+    @Override
+    public void close() {
+        kvClient.close();
+        leaseClient.close();
+        watchClient.close();
+        client.close();
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Set<BookieSocketAddress>>> getWritableBookies() {
+        return kvClient.get(
+            ByteSequence.fromString(this.bookieRegistrationPath),
+            GetOption.newBuilder()
+                .withRange(ByteSequence.fromString(this.bookieRegistrationPath + "_/"))
+                .withSortField(SortTarget.KEY)
+                .withLimit(-1)
+                .build())
+            .thenApply(response -> {
+                // TODO: process the response and convert it to BookieSocketAddress
+
+                return null;
+            });
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Set<BookieSocketAddress>>> getReadOnlyBookies() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> watchWritableBookies(RegistrationListener listener) {
+        Watcher watcher = watchClient.watch(
+            ByteSequence.fromString(this.bookieRegistrationPath),
+            WatchOption.newBuilder()
+                .withRange(ByteSequence.fromString(this.bookieRegistrationPath + "_/"))
+                .build());
+        // TODO: need a better async interface for continous listening on watch response.
+        // TODO: need to put listener into a loop
+        try {
+            WatchResponse response = watcher.listen();
+            for (WatchEvent event : response.getEvents()) {
+                switch (event.getEventType()) {
+                    case PUT:
+                        // TODO: process a new node
+                        break;
+                    case DELETE:
+                        // TODO: process when an old node is removed
+                        break;
+                    default:
+                        break;
+                }
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        return FutureUtils.Void();
+    }
+
+    @Override
+    public void unwatchWritableBookies(RegistrationListener listener) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener listener) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void unwatchReadOnlyBookies(RegistrationListener listener) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LayoutManager getLayoutManager() {
+        return layoutManager;
+    }
+}
diff --git a/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdRegistrationManager.java b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdRegistrationManager.java
new file mode 100644
index 000000000..cf3680766
--- /dev/null
+++ b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdRegistrationManager.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.metadata.etcd;
+
+import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+
+import com.coreos.jetcd.Client;
+import com.coreos.jetcd.KV;
+import com.coreos.jetcd.Lease;
+import com.coreos.jetcd.Lease.KeepAliveListener;
+import com.coreos.jetcd.Watch;
+import com.coreos.jetcd.data.ByteSequence;
+import com.coreos.jetcd.op.Cmp;
+import com.coreos.jetcd.op.Cmp.Op;
+import com.coreos.jetcd.op.CmpTarget;
+import com.coreos.jetcd.options.PutOption;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.meta.LayoutManager;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+
+/**
+ * Etcd registration manager.
+ */
+public class EtcdRegistrationManager implements RegistrationManager {
+
+    private String scope;
+    private Client client;
+    private KV kvClient;
+    private Lease leaseClient;
+    private Watch watchClient;
+    private LayoutManager layoutManager;
+    private long leaseId;
+    private ScheduledExecutorService listenerExecutor;
+
+    // registration paths
+    private String bookieRegistrationPath;
+    private String bookieReadonlyRegistrationPath;
+
+    @Override
+    public RegistrationManager initialize(ServerConfiguration conf, RegistrationListener listener, StatsLogger statsLogger) throws BookieException {
+        // initialize etcd kv and lease client
+        this.scope = conf.getZkLedgersRootPath();
+
+        this.bookieRegistrationPath = conf.getZkAvailableBookiesPath() + "/writable";
+        this.bookieReadonlyRegistrationPath = conf.getZkAvailableBookiesPath() + "/" + READONLY;
+
+        // TODO: initialize etcd
+        this.client = Client.builder()
+            .endpoints(conf.getZkServers()) // TODO: make a more general name
+            .build();
+        this.kvClient = client.getKVClient();
+        this.leaseClient = client.getLeaseClient();
+        this.watchClient = client.getWatchClient();
+
+        this.layoutManager = new EtcdLayoutManager(kvClient, leaseClient, scope);
+        return this;
+    }
+
+    @Override
+    public void close() {
+        kvClient.close();
+        leaseClient.close();
+        watchClient.close();
+        client.close();
+    }
+
+    @Override
+    public String getClusterInstanceId() throws BookieException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void registerBookie(String bookieId, boolean readOnly) throws BookieException {
+        // create 30 seconds lease
+        try {
+            this.leaseId = this.leaseClient.grant(30).get().getID();
+        } catch (InterruptedException e) {
+            // TODO:
+        } catch (ExecutionException e) {
+            // TODO:
+        }
+        // keep the lease alive
+        KeepAliveListener keepAliveListener = this.leaseClient.keepAlive(leaseId);
+        // TODO: process the keep alive listener, and propagate keep alive responses to listener
+        //       when a lease is expired.
+
+        String regPath;
+        if (readOnly) {
+            regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
+        } else {
+            regPath = bookieRegistrationPath + "/" + bookieId;
+        }
+        try {
+            kvClient.txn()
+                .If(new Cmp(
+                    ByteSequence.fromString(regPath),
+                    Op.EQUAL,
+                    CmpTarget.value(ByteSequence.fromBytes(new byte[0]))))
+                .Then(com.coreos.jetcd.op.Op.put(
+                    ByteSequence.fromString(regPath),
+                    ByteSequence.fromBytes(new byte[0]),
+                    PutOption.newBuilder()
+                        .withLeaseId(leaseId) // use the lease id
+                        .build()))
+                .commit()
+                .get();
+        } catch (InterruptedException e) {
+            // TODO:
+        } catch (ExecutionException e) {
+            // TODO:
+        }
+
+    }
+
+
+    @Override
+    public void unregisterBookie(String bookieId, boolean readOnly) throws BookieException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isBookieRegistered(String bookieId) throws BookieException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void writeCookie(String bookieId, Versioned<byte[]> cookieData) throws BookieException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Versioned<byte[]> readCookie(String bookieId) throws BookieException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void removeCookie(String bookieId, Version version) throws BookieException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LayoutManager getLayoutManager() {
+        return layoutManager;
+    }
+
+    @Override
+    public boolean prepareFormat() throws Exception {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean initNewCluster() throws Exception {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean format() throws Exception {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean nukeExistingCluster() throws Exception {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdUtils.java b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdUtils.java
new file mode 100644
index 000000000..f418c86d1
--- /dev/null
+++ b/metadata-stores/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.metadata.etcd;
+
+import static org.apache.bookkeeper.util.BookKeeperConstants.LAYOUT_ZNODE;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+
+/**
+ * Utils for etcd based metadata store.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+final class EtcdUtils {
+
+    private static final String LEDGER_IDGEN_PREFIX = "IDGEN";
+    private static final int DEFAULT_IDGEN_BUCKET = 0;
+
+    static String getIdGenKey(String scope) {
+        // TODO: add a bucket id here to allow improving this to a concurrent
+        //       id generation approach.
+        return String.format("%s/%s/%06d",
+            scope,
+            LEDGER_IDGEN_PREFIX,
+            DEFAULT_IDGEN_BUCKET);
+    }
+
+    static String getLayoutKey(String scope) {
+        return String.format("%s/%s",
+            scope, LAYOUT_ZNODE);
+    }
+
+    static String getLedgerKey(String scope, long ledgerId) {
+        return String.format("%s/ledgers/%019d", scope, ledgerId);
+    }
+
+    static <T> T ioResult(CompletableFuture<T> future) throws IOException {
+        return FutureUtils.result(future, cause -> {
+            if (cause instanceof IOException) {
+                return (IOException) cause;
+            } else {
+                return new IOException(cause);
+            }
+        });
+    }
+
+    public static long toLong(byte[] memory, int index) {
+        return ((long) memory[index] & 0xff) << 56
+            | ((long) memory[index + 1] & 0xff) << 48
+            | ((long) memory[index + 2] & 0xff) << 40
+            | ((long) memory[index + 3] & 0xff) << 32
+            | ((long) memory[index + 4] & 0xff) << 24
+            | ((long) memory[index + 5] & 0xff) << 16
+            | ((long) memory[index + 6] & 0xff) << 8
+            | (long) memory[index + 7] & 0xff;
+    }
+
+    /**
+     * Convert a long number to a bytes array.
+     *
+     * @param value the long number
+     * @return the bytes array
+     */
+    public static byte[] toBytes(long value) {
+        byte[] memory = new byte[8];
+        toBytes(value, memory, 0);
+        return memory;
+    }
+
+    public static void toBytes(long value, byte[] memory, int index) {
+        memory[index] = (byte) (value >>> 56);
+        memory[index + 1] = (byte) (value >>> 48);
+        memory[index + 2] = (byte) (value >>> 40);
+        memory[index + 3] = (byte) (value >>> 32);
+        memory[index + 4] = (byte) (value >>> 24);
+        memory[index + 5] = (byte) (value >>> 16);
+        memory[index + 6] = (byte) (value >>> 8);
+        memory[index + 7] = (byte) value;
+    }
+
+}
diff --git a/metadata-stores/pom.xml b/metadata-stores/pom.xml
new file mode 100644
index 000000000..ce79bc6cd
--- /dev/null
+++ b/metadata-stores/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>bookkeeper</artifactId>
+        <groupId>org.apache.bookkeeper</groupId>
+        <version>4.7.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.bookkeeper.metadata</groupId>
+    <artifactId>metadata-stores-parent</artifactId>
+    <packaging>pom</packaging>
+    <name>Apache BookKeeper :: Metadata Stores :: Parent</name>
+    <modules>
+        <module>etcd</module>
+    </modules>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    </properties>
+
+</project>
diff --git a/pom.xml b/pom.xml
index 0742417d8..f3a108451 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,6 +57,7 @@
     <module>bookkeeper-stats</module>
     <module>bookkeeper-proto</module>
     <module>bookkeeper-server</module>
+    <module>metadata-stores</module>
     <module>bookkeeper-benchmark</module>
     <module>bookkeeper-stats-providers</module>
     <module>bookkeeper-http</module>
@@ -104,6 +105,7 @@
     <!-- dependencies -->
     <commons-configuration.version>1.10</commons-configuration.version>
     <commons-lang3.version>3.3.2</commons-lang3.version>
+    <etcd.version>0.0.1</etcd.version>
     <google.code.version>3.0.2</google.code.version>
     <google.errorprone.version>2.1.2</google.errorprone.version>
     <guava.version>20.0</guava.version>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services