You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2019/02/19 18:44:41 UTC

[hadoop] branch trunk updated: HDDS-1101. SCM CA: Write Certificate information to SCM Metadata. Contributed by Anu Engineer.

This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cf1a66d  HDDS-1101. SCM CA: Write Certificate information to SCM Metadata. Contributed by Anu Engineer.
cf1a66d is described below

commit cf1a66d0da32b86ea69d5817df1aba0294fa66f9
Author: Xiaoyu Yao <xy...@apache.org>
AuthorDate: Tue Feb 19 10:42:56 2019 -0800

    HDDS-1101. SCM CA: Write Certificate information to SCM Metadata. Contributed by Anu Engineer.
---
 .../certificate/authority/CertificateStore.java    |  80 ++++++++++++++
 .../certificate/authority/DefaultApprover.java     |   7 +-
 .../certificate/authority/DefaultCAServer.java     |  28 ++++-
 .../x509/certificate/utils/CertificateCodec.java   |  40 ++++---
 .../java/org/apache/hadoop/utils/db/Codec.java     |   6 +-
 .../org/apache/hadoop/utils/db/CodecRegistry.java  |   5 +-
 .../org/apache/hadoop/utils/db/StringCodec.java    |   5 +-
 .../java/org/apache/hadoop/utils/db/Table.java     |   4 +-
 .../org/apache/hadoop/utils/db/TableIterator.java  |   5 +-
 .../org/apache/hadoop/utils/db/TypedTable.java     |   8 +-
 .../x509/certificate/authority/MockCAStore.java}   |  35 +++++--
 .../certificate/authority/TestDefaultCAServer.java |  10 +-
 .../apache/hadoop/utils/db/TestRDBTableStore.java  |  17 +--
 .../hadoop/utils/db/TestTypedRDBTableStore.java    |   8 +-
 .../{LongCodec.java => BigIntegerCodec.java}       |  18 ++--
 .../metadata/DeletedBlocksTransactionCodec.java    |   7 +-
 .../apache/hadoop/hdds/scm/metadata/LongCodec.java |   7 +-
 .../hadoop/hdds/scm/metadata/SCMMetadataStore.java |  36 ++++++-
 .../hdds/scm/metadata/SCMMetadataStoreRDBImpl.java |  60 ++++++++++-
 ...sactionCodec.java => X509CertificateCodec.java} |  36 ++++---
 .../hadoop/hdds/scm/server/SCMCertStore.java       | 115 +++++++++++++++++++++
 .../hdds/scm/server/StorageContainerManager.java   |  25 +++--
 .../hadoop/ozone/om/codec/OmBucketInfoCodec.java   |   5 +-
 .../hadoop/ozone/om/codec/OmKeyInfoCodec.java      |   5 +-
 .../ozone/om/codec/OmMultipartKeyInfoCodec.java    |   7 +-
 .../hadoop/ozone/om/codec/OmVolumeArgsCodec.java   |   5 +-
 .../hadoop/ozone/om/codec/VolumeListCodec.java     |   5 +-
 .../om/codec/TestOmMultipartKeyInfoCodec.java      |  18 +++-
 28 files changed, 495 insertions(+), 112 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
new file mode 100644
index 0000000..961d048
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.security.x509.certificate.authority;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.cert.X509Certificate;
+
+/**
+ * This interface allows the DefaultCA to be portable and use different DB
+ * interfaces later. It also allows us define this interface in the SCM layer
+ * by which we don't have to take a circular dependency between hdds-common
+ * and the SCM.
+ *
+ * With this interface, DefaultCA server read and write DB or persistence
+ * layer and we can write to SCM's Metadata DB.
+ */
+public interface CertificateStore {
+
+  /**
+   * Writes a new certificate that was issued to the persistent store.
+   * @param serialID - Certificate Serial Number.
+   * @param certificate - Certificate to persist.
+   * @throws IOException - on Failure.
+   */
+  void storeValidCertificate(BigInteger serialID,
+                             X509Certificate certificate) throws IOException;
+
+  /**
+   * Moves a certificate in a transactional manner from valid certificate to
+   * revoked certificate state.
+   * @param serialID - Serial ID of the certificate.
+   * @throws IOException
+   */
+  void revokeCertificate(BigInteger serialID) throws IOException;
+
+  /**
+   * Deletes an expired certificate from the store. Please note: We don't
+   * remove revoked certificates, we need that information to generate the
+   * CRLs.
+   * @param serialID - Certificate ID.
+   */
+  void removeExpiredCertificate(BigInteger serialID) throws IOException;
+
+  /**
+   * Retrieves a Certificate based on the Serial number of that certificate.
+   * @param serialID - ID of the certificate.
+   * @param certType
+   * @return X509Certificate
+   * @throws IOException
+   */
+  X509Certificate getCertificateByID(BigInteger serialID, CertType certType)
+      throws IOException;
+
+  /**
+   * Different kind of Certificate stores.
+   */
+  enum CertType {
+    VALID_CERTS,
+    REVOKED_CERTS
+  }
+
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
index 9e08321..827c82f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
@@ -19,10 +19,10 @@
 
 package org.apache.hadoop.hdds.security.x509.certificate.authority;
 
-import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.PKIProfile;
+import org.apache.hadoop.util.Time;
 import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
 import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
 import org.bouncycastle.cert.X509CertificateHolder;
@@ -100,9 +100,8 @@ public class DefaultApprover extends BaseApprover {
     X509v3CertificateBuilder certificateGenerator =
         new X509v3CertificateBuilder(
             caCertificate.getSubject(),
-            // When we do persistence we will check if the certificate number
-            // is a duplicate.
-            new BigInteger(RandomUtils.nextBytes(8)),
+            // Serial is not sequential but it is monotonically increasing.
+            BigInteger.valueOf(Time.monotonicNowNanos()),
             validFrom,
             validTill,
             certificationRequest.getSubject(), keyInfo);
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
index 40e2c70..ef289e1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
@@ -115,18 +115,21 @@ public class DefaultCAServer implements CertificateServer {
    */
   private PKIProfile profile;
   private CertificateApprover approver;
+  private CertificateStore store;
 
   /**
    * Create an Instance of DefaultCAServer.
-   *
-   * @param subject - String Subject
+   *  @param subject - String Subject
    * @param clusterID - String ClusterID
    * @param scmID - String SCMID.
+   * @param certificateStore - A store used to persist Certificates.
    */
-  public DefaultCAServer(String subject, String clusterID, String scmID) {
+  public DefaultCAServer(String subject, String clusterID, String scmID,
+                         CertificateStore certificateStore) {
     this.subject = subject;
     this.clusterID = clusterID;
     this.scmID = scmID;
+    this.store = certificateStore;
   }
 
   @Override
@@ -207,12 +210,15 @@ public class DefaultCAServer implements CertificateServer {
             getCAKeys().getPrivate(),
             getCACertificate(), java.sql.Date.valueOf(beginDate),
             java.sql.Date.valueOf(endDate), csr);
+        store.storeValidCertificate(xcert.getSerialNumber(),
+            CertificateCodec.getX509Certificate(xcert));
         xcertHolder.complete(xcert);
         break;
       default:
         return null; // cannot happen, keeping checkstyle happy.
       }
-    } catch (IOException | OperatorCreationException e) {
+    } catch (CertificateException | IOException | OperatorCreationException e) {
+      LOG.error("Unable to issue a certificate. {}", e);
       xcertHolder.completeExceptionally(new SCMSecurityException(e));
     }
     return xcertHolder;
@@ -230,7 +236,19 @@ public class DefaultCAServer implements CertificateServer {
   public Future<Boolean> revokeCertificate(X509Certificate certificate,
       CertificateApprover.ApprovalType approverType)
       throws SCMSecurityException {
-    return null;
+    CompletableFuture<Boolean> revoked = new CompletableFuture<>();
+    if (certificate == null) {
+      revoked.completeExceptionally(new SCMSecurityException(
+          "Certificate cannot be null"));
+      return revoked;
+    }
+    try {
+      store.revokeCertificate(certificate.getSerialNumber());
+    } catch (IOException ex) {
+      LOG.error("Revoking the certificate failed. {}", ex.getCause());
+      throw new SCMSecurityException(ex);
+    }
+    return revoked;
   }
 
   /**
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
index b3c40ef..71c6d95 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
@@ -104,34 +104,41 @@ public class CertificateCodec {
   }
 
   /**
-   * Get Certificate location.
+   * Returns the Certificate as a PEM encoded String.
    *
-   * @return Path
+   * @param x509CertHolder - X.509 Certificate Holder.
+   * @return PEM Encoded Certificate String.
+   * @throws SCMSecurityException - On failure to create a PEM String.
    */
-  public Path getLocation() {
-    return location;
+  public static String getPEMEncodedString(X509CertificateHolder x509CertHolder)
+      throws SCMSecurityException {
+    try {
+      return getPEMEncodedString(getX509Certificate(x509CertHolder));
+    } catch (CertificateException exp) {
+      throw new SCMSecurityException(exp);
+    }
   }
 
   /**
    * Returns the Certificate as a PEM encoded String.
    *
-   * @param x509CertHolder - X.509 Certificate Holder.
+   * @param certificate - X.509 Certificate.
    * @return PEM Encoded Certificate String.
    * @throws SCMSecurityException - On failure to create a PEM String.
    */
-  public static String getPEMEncodedString(X509CertificateHolder x509CertHolder)
+  public static String getPEMEncodedString(X509Certificate certificate)
       throws SCMSecurityException {
     try {
       StringWriter stringWriter = new StringWriter();
       try (JcaPEMWriter pemWriter = new JcaPEMWriter(stringWriter)) {
-        pemWriter.writeObject(getX509Certificate(x509CertHolder));
+        pemWriter.writeObject(certificate);
       }
       return stringWriter.toString();
-    } catch (CertificateException | IOException e) {
-      LOG.error("Error in encoding certificate." + x509CertHolder
-          .getSubject().toString(), e);
+    } catch (IOException e) {
+      LOG.error("Error in encoding certificate." + certificate
+          .getSubjectDN().toString(), e);
       throw new SCMSecurityException("PEM Encoding failed for certificate." +
-          x509CertHolder.getSubject().toString(), e);
+          certificate.getSubjectDN().toString(), e);
     }
   }
 
@@ -143,7 +150,7 @@ public class CertificateCodec {
    * @throws CertificateException - Thrown on Failure.
    * @throws IOException          - Thrown on Failure.
    */
-  public X509Certificate getX509Certificate(String pemEncodedString)
+  public static X509Certificate getX509Certificate(String pemEncodedString)
       throws CertificateException, IOException {
     CertificateFactory fact = CertificateFactory.getInstance("X.509");
     try (InputStream input = IOUtils.toInputStream(pemEncodedString, UTF_8)) {
@@ -152,6 +159,15 @@ public class CertificateCodec {
   }
 
   /**
+   * Get Certificate location.
+   *
+   * @return Path
+   */
+  public Path getLocation() {
+    return location;
+  }
+
+  /**
    * Write the Certificate pointed to the location by the configs.
    *
    * @param xCertificate - Certificate to write.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Codec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Codec.java
index 69637dc..15720ea 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Codec.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Codec.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.utils.db;
 
+import java.io.IOException;
+
 /**
  * Codec interface to marshall/unmarshall data to/from a byte[] based
  * key/value store.
@@ -30,12 +32,12 @@ public interface Codec<T> {
    * Convert object to raw persisted format.
    * @param object The original java object. Should not be null.
    */
-  byte[] toPersistedFormat(T object);
+  byte[] toPersistedFormat(T object) throws IOException;
 
   /**
    * Convert object from raw persisted format.
    *
    * @param rawData Byte array from the key/value store. Should not be null.
    */
-  T fromPersistedFormat(byte[] rawData);
+  T fromPersistedFormat(byte[] rawData) throws IOException;
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/CodecRegistry.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/CodecRegistry.java
index 9c367ba..afa72ad 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/CodecRegistry.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/CodecRegistry.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.utils.db;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -43,7 +44,7 @@ public class CodecRegistry {
    * @param <T>     Type of the return value.
    * @return the object with the parsed field data
    */
-  public <T> T asObject(byte[] rawData, Class<T> format) {
+  public <T> T asObject(byte[] rawData, Class<T> format) throws IOException {
     if (rawData == null) {
       return null;
     }
@@ -62,7 +63,7 @@ public class CodecRegistry {
    * @param <T>    Type of the typed object.
    * @return byte array to store it ini the kv store.
    */
-  public <T> byte[] asRawData(T object) {
+  public <T> byte[] asRawData(T object) throws IOException {
     Preconditions.checkNotNull(object,
         "Null value shouldn't be persisted in the database");
     Class<T> format = (Class<T>) object.getClass();
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/StringCodec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/StringCodec.java
index 36e4bc3..5f38d68 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/StringCodec.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/StringCodec.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.utils.db;
 
+import java.io.IOException;
 import org.apache.hadoop.hdfs.DFSUtil;
 
 /**
@@ -26,7 +27,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 public class StringCodec implements Codec<String> {
 
   @Override
-  public byte[] toPersistedFormat(String object) {
+  public byte[] toPersistedFormat(String object) throws IOException {
     if (object != null) {
       return DFSUtil.string2Bytes(object);
     } else {
@@ -35,7 +36,7 @@ public class StringCodec implements Codec<String> {
   }
 
   @Override
-  public String fromPersistedFormat(byte[] rawData) {
+  public String fromPersistedFormat(byte[] rawData) throws IOException {
     if (rawData != null) {
       return DFSUtil.bytes2String(rawData);
     } else {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java
index e39c872..2f14e77 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java
@@ -102,8 +102,8 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
    */
   interface KeyValue<KEY, VALUE> {
 
-    KEY getKey();
+    KEY getKey() throws IOException;
 
-    VALUE getValue();
+    VALUE getValue() throws IOException;
   }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TableIterator.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TableIterator.java
index fcd8535..2e8e7ed 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TableIterator.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TableIterator.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.utils.db;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.Iterator;
 
 /**
@@ -45,13 +46,13 @@ public interface TableIterator<KEY, T> extends Iterator<T>, Closeable {
    * @param key - Bytes that represent the key.
    * @return VALUE.
    */
-  T seek(KEY key);
+  T seek(KEY key) throws IOException;
 
   /**
    * Returns the key value at the current position.
    * @return KEY
    */
-  KEY key();
+  KEY key() throws IOException;
 
   /**
    * Returns the VALUE at the current position.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
index f715572..667822b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
@@ -123,12 +123,12 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
     }
 
     @Override
-    public KEY getKey() {
+    public KEY getKey() throws IOException {
       return codecRegistry.asObject(rawKeyValue.getKey(), keyType);
     }
 
     @Override
-    public VALUE getValue() {
+    public VALUE getValue() throws IOException {
       return codecRegistry.asObject(rawKeyValue.getValue(), valueType);
     }
   }
@@ -163,7 +163,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
     }
 
     @Override
-    public TypedKeyValue seek(KEY key) {
+    public TypedKeyValue seek(KEY key) throws IOException {
       byte[] keyBytes = codecRegistry.asRawData(key);
       KeyValue<byte[], byte[]> result = rawIterator.seek(keyBytes);
       if (result == null) {
@@ -173,7 +173,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
     }
 
     @Override
-    public KEY key() {
+    public KEY key() throws IOException {
       byte[] result = rawIterator.key();
       if (result == null) {
         return null;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
similarity index 53%
copy from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java
copy to hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
index 2d495ab..1dea512 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
@@ -17,23 +17,38 @@
  *
  */
 
-package org.apache.hadoop.hdds.scm.metadata;
+package org.apache.hadoop.hdds.security.x509.certificate.authority;
 
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.utils.db.Codec;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.cert.X509Certificate;
 
 /**
- * Codec for Persisting the DeletedBlocks.
+ *
  */
-public class LongCodec implements Codec<Long> {
+public class MockCAStore implements CertificateStore {
+  @Override
+  public void storeValidCertificate(BigInteger serialID,
+                                    X509Certificate certificate)
+      throws IOException {
+
+  }
 
   @Override
-  public byte[] toPersistedFormat(Long object) {
-    return Longs.toByteArray(object);
+  public void revokeCertificate(BigInteger serialID) throws IOException {
+
+  }
+
+  @Override
+  public void removeExpiredCertificate(BigInteger serialID)
+      throws IOException {
+
   }
 
   @Override
-  public Long fromPersistedFormat(byte[] rawData) {
-    return Longs.fromByteArray(rawData);
+  public X509Certificate getCertificateByID(BigInteger serialID,
+                                            CertType certType)
+      throws IOException {
+    return null;
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
index ba02ba7..8d807b7 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
@@ -51,10 +51,12 @@ public class TestDefaultCAServer {
   private static OzoneConfiguration conf = new OzoneConfiguration();
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private MockCAStore caStore;
 
   @Before
   public void init() throws IOException {
     conf.set(OZONE_METADATA_DIRS, temporaryFolder.newFolder().toString());
+    caStore = new MockCAStore();
   }
 
   @Test
@@ -63,7 +65,7 @@ public class TestDefaultCAServer {
     SecurityConfig securityConfig = new SecurityConfig(conf);
     CertificateServer testCA = new DefaultCAServer("testCA",
         RandomStringUtils.randomAlphabetic(4),
-        RandomStringUtils.randomAlphabetic(4));
+        RandomStringUtils.randomAlphabetic(4), caStore);
     testCA.init(securityConfig, CertificateServer.CAType.SELF_SIGNED_CA);
     X509CertificateHolder first = testCA.getCACertificate();
     assertNotNull(first);
@@ -88,7 +90,7 @@ public class TestDefaultCAServer {
     SecurityConfig securityConfig = new SecurityConfig(conf);
     CertificateServer testCA = new DefaultCAServer("testCA",
         RandomStringUtils.randomAlphabetic(4),
-        RandomStringUtils.randomAlphabetic(4));
+        RandomStringUtils.randomAlphabetic(4), caStore);
     Consumer<SecurityConfig> caInitializer =
         ((DefaultCAServer) testCA).processVerificationStatus(
         DefaultCAServer.VerificationStatus.MISSING_CERTIFICATE);
@@ -108,7 +110,7 @@ public class TestDefaultCAServer {
     SecurityConfig securityConfig = new SecurityConfig(conf);
     CertificateServer testCA = new DefaultCAServer("testCA",
         RandomStringUtils.randomAlphabetic(4),
-        RandomStringUtils.randomAlphabetic(4));
+        RandomStringUtils.randomAlphabetic(4), caStore);
     Consumer<SecurityConfig> caInitializer =
         ((DefaultCAServer) testCA).processVerificationStatus(
             DefaultCAServer.VerificationStatus.MISSING_KEYS);
@@ -155,7 +157,7 @@ public class TestDefaultCAServer {
 
     CertificateServer testCA = new DefaultCAServer("testCA",
         RandomStringUtils.randomAlphabetic(4),
-        RandomStringUtils.randomAlphabetic(4));
+        RandomStringUtils.randomAlphabetic(4), caStore);
     testCA.init(new SecurityConfig(conf),
         CertificateServer.CAType.SELF_SIGNED_CA);
 
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBTableStore.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBTableStore.java
index 4dbf792..38d30c1 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBTableStore.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBTableStore.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.utils.db;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -56,6 +57,16 @@ public class TestRDBTableStore {
   private RDBStore rdbStore = null;
   private DBOptions options = null;
 
+  private static boolean consume(Table.KeyValue keyValue)  {
+    count++;
+    try {
+      Assert.assertNotNull(keyValue.getKey());
+    } catch(IOException ex) {
+      Assert.fail("Unexpected Exception " + ex.toString());
+    }
+    return true;
+  }
+
   @Before
   public void setUp() throws Exception {
     options = new DBOptions();
@@ -191,12 +202,6 @@ public class TestRDBTableStore {
     }
   }
 
-  private static boolean consume(Table.KeyValue keyValue) {
-    count++;
-    Assert.assertNotNull(keyValue.getKey());
-    return true;
-  }
-
   @Test
   public void forEachAndIterator() throws Exception {
     final int iterCount = 100;
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestTypedRDBTableStore.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestTypedRDBTableStore.java
index 2f28497..4d3b1bf 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestTypedRDBTableStore.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestTypedRDBTableStore.java
@@ -199,7 +199,11 @@ public class TestTypedRDBTableStore {
 
   private static boolean consume(Table.KeyValue keyValue) {
     count++;
-    Assert.assertNotNull(keyValue.getKey());
+    try {
+      Assert.assertNotNull(keyValue.getKey());
+    } catch (IOException ex) {
+      Assert.fail(ex.toString());
+    }
     return true;
   }
 
@@ -232,4 +236,4 @@ public class TestTypedRDBTableStore {
       }
     }
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/BigIntegerCodec.java
similarity index 72%
copy from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java
copy to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/BigIntegerCodec.java
index 2d495ab..6b1c458 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/BigIntegerCodec.java
@@ -19,21 +19,21 @@
 
 package org.apache.hadoop.hdds.scm.metadata;
 
-import com.google.common.primitives.Longs;
+import java.io.IOException;
+import java.math.BigInteger;
 import org.apache.hadoop.utils.db.Codec;
 
 /**
- * Codec for Persisting the DeletedBlocks.
+ * Encode and decode BigInteger.
  */
-public class LongCodec implements Codec<Long> {
-
+public class BigIntegerCodec implements Codec<BigInteger> {
   @Override
-  public byte[] toPersistedFormat(Long object) {
-    return Longs.toByteArray(object);
+  public byte[] toPersistedFormat(BigInteger object) throws IOException {
+    return object.toByteArray();
   }
 
   @Override
-  public Long fromPersistedFormat(byte[] rawData) {
-    return Longs.fromByteArray(rawData);
+  public BigInteger fromPersistedFormat(byte[] rawData) throws IOException {
+    return new BigInteger(rawData);
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java
index 5deb3aa..ec186b8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.metadata;
 
 
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.utils.db.Codec;
 
@@ -30,12 +31,14 @@ import org.apache.hadoop.utils.db.Codec;
 public class DeletedBlocksTransactionCodec
     implements Codec<DeletedBlocksTransaction> {
   @Override
-  public byte[] toPersistedFormat(DeletedBlocksTransaction object) {
+  public byte[] toPersistedFormat(DeletedBlocksTransaction object)
+      throws IOException {
     return object.toByteArray();
   }
 
   @Override
-  public DeletedBlocksTransaction fromPersistedFormat(byte[] rawData) {
+  public DeletedBlocksTransaction fromPersistedFormat(byte[] rawData)
+      throws IOException {
     try {
       return DeletedBlocksTransaction.parseFrom(rawData);
     } catch (InvalidProtocolBufferException e) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java
index 2d495ab..a4a9084 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hdds.scm.metadata;
 
 import com.google.common.primitives.Longs;
+import java.io.IOException;
 import org.apache.hadoop.utils.db.Codec;
 
 /**
@@ -28,12 +29,12 @@ import org.apache.hadoop.utils.db.Codec;
 public class LongCodec implements Codec<Long> {
 
   @Override
-  public byte[] toPersistedFormat(Long object) {
+  public byte[] toPersistedFormat(Long object) throws IOException {
     return Longs.toByteArray(object);
   }
 
   @Override
-  public Long fromPersistedFormat(byte[] rawData) {
+  public Long fromPersistedFormat(byte[] rawData) throws IOException {
     return Longs.fromByteArray(rawData);
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
index 8009e66..164522d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.hdds.scm.metadata;
 
+import java.math.BigInteger;
+import java.security.cert.X509Certificate;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import java.io.IOException;
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
 import org.apache.hadoop.utils.db.DBStore;
 import org.apache.hadoop.utils.db.Table;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.utils.db.TableIterator;
 
 /**
  * Generic interface for data stores for SCM.
@@ -53,21 +57,47 @@ public interface SCMMetadataStore {
   DBStore getStore();
 
   /**
-   *  A Table that keeps the deleted blocks lists and transactions.
-    * @return Table
+   * A Table that keeps the deleted blocks lists and transactions.
+   *
+   * @return Table
    */
   Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable();
 
   /**
    * Returns the current TXID for the deleted blocks.
+   *
    * @return Long
    */
   Long getCurrentTXID();
 
   /**
    * Returns the next TXID for the Deleted Blocks.
+   *
    * @return Long.
-  */
+   */
   Long getNextDeleteBlockTXID();
 
+  /**
+   * A table that maintains all the valid certificates issued by the SCM CA.
+   *
+   * @return Table
+   */
+  Table<BigInteger, X509Certificate> getValidCertsTable();
+
+  /**
+   * A Table that maintains all revoked certificates until they expire.
+   *
+   * @return Table.
+   */
+  Table<BigInteger, X509Certificate> getRevokedCertsTable();
+
+  /**
+   * Returns the list of Certificates of a specific type.
+   *
+   * @param certType - CertType.
+   * @return Iterator<X509Certificate>
+   */
+  TableIterator getAllCerts(CertificateStore.CertType certType);
+
+
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
index 49a643d..e2983be 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
@@ -18,10 +18,13 @@
 package org.apache.hadoop.hdds.scm.metadata;
 
 import java.io.File;
+import java.math.BigInteger;
 import java.nio.file.Paths;
+import java.security.cert.X509Certificate;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import java.io.IOException;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.utils.db.DBStore;
 import org.apache.hadoop.utils.db.DBStoreBuilder;
@@ -37,17 +40,28 @@ import static org.apache.hadoop.ozone.OzoneConsts.SCM_DB_NAME;
  * A RocksDB based implementation of SCM Metadata Store.
  * <p>
  * <p>
- * +---------------+------------+-------------------------+
- * | Column Family |    Key     |          Value          |
- * +---------------+------------+-------------------------+
- * | DeletedBlocks | TXID(Long) | DeletedBlockTransaction |
- * +---------------+------------+-------------------------+
+ * +---------------+------------------+-------------------------+
+ * | Column Family |    Key           |          Value          |
+ * +---------------+------------------+-------------------------+
+ * | DeletedBlocks | TXID(Long)       | DeletedBlockTransaction |
+ * +---------------+------------------+-------------------------+
+ * | ValidCerts    | Serial (BigInt)  | X509Certificate         |
+ * +---------------+------------------+-------------------------+
+ * |RevokedCerts   | Serial (BigInt)  | X509Certificate         |
+ * +---------------+------------------+-------------------------+
  */
 public class SCMMetadataStoreRDBImpl implements SCMMetadataStore {
 
   private static final String DELETED_BLOCKS_TABLE = "deletedBlocks";
   private Table deletedBlocksTable;
 
+  private static final String VALID_CERTS_TABLE = "validCerts";
+  private Table validCertsTable;
+
+  private static final String REVOKED_CERTS_TABLE = "revokedCerts";
+  private Table revokedCertsTable;
+
+
 
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMMetadataStoreRDBImpl.class);
@@ -77,13 +91,26 @@ public class SCMMetadataStoreRDBImpl implements SCMMetadataStore {
           .setName(SCM_DB_NAME)
           .setPath(Paths.get(metaDir.getPath()))
           .addTable(DELETED_BLOCKS_TABLE)
+          .addTable(VALID_CERTS_TABLE)
+          .addTable(REVOKED_CERTS_TABLE)
           .addCodec(DeletedBlocksTransaction.class,
               new DeletedBlocksTransactionCodec())
           .addCodec(Long.class, new LongCodec())
+          .addCodec(BigInteger.class, new BigIntegerCodec())
+          .addCodec(X509Certificate.class, new X509CertificateCodec())
           .build();
+
       deletedBlocksTable = this.store.getTable(DELETED_BLOCKS_TABLE,
           Long.class, DeletedBlocksTransaction.class);
       checkTableStatus(deletedBlocksTable, DELETED_BLOCKS_TABLE);
+
+      validCertsTable = this.store.getTable(VALID_CERTS_TABLE,
+          BigInteger.class, X509Certificate.class);
+      checkTableStatus(validCertsTable, VALID_CERTS_TABLE);
+
+      revokedCertsTable = this.store.getTable(REVOKED_CERTS_TABLE,
+          BigInteger.class, X509Certificate.class);
+      checkTableStatus(revokedCertsTable, REVOKED_CERTS_TABLE);
     }
   }
 
@@ -111,6 +138,29 @@ public class SCMMetadataStoreRDBImpl implements SCMMetadataStore {
   }
 
   @Override
+  public Table<BigInteger, X509Certificate> getValidCertsTable() {
+    return validCertsTable;
+  }
+
+  @Override
+  public Table<BigInteger, X509Certificate> getRevokedCertsTable() {
+    return revokedCertsTable;
+  }
+
+  @Override
+  public TableIterator getAllCerts(CertificateStore.CertType certType) {
+    if(certType == CertificateStore.CertType.VALID_CERTS) {
+      return validCertsTable.iterator();
+    }
+
+    if(certType == CertificateStore.CertType.REVOKED_CERTS) {
+      return revokedCertsTable.iterator();
+    }
+
+    return null;
+  }
+
+  @Override
   public Long getCurrentTXID() {
     return this.txID.get();
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/X509CertificateCodec.java
similarity index 50%
copy from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java
copy to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/X509CertificateCodec.java
index 5deb3aa..bfb3868 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/X509CertificateCodec.java
@@ -19,28 +19,36 @@
 
 package org.apache.hadoop.hdds.scm.metadata;
 
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.utils.db.Codec;
 
 /**
- * Codec for Persisting the DeletedBlocks.
+ * Encodes and Decodes X509Certificate Class.
  */
-public class DeletedBlocksTransactionCodec
-    implements Codec<DeletedBlocksTransaction> {
+public class X509CertificateCodec implements Codec<X509Certificate> {
   @Override
-  public byte[] toPersistedFormat(DeletedBlocksTransaction object) {
-    return object.toByteArray();
+  public byte[] toPersistedFormat(X509Certificate object) throws IOException {
+    try {
+      return CertificateCodec.getPEMEncodedString(object)
+          .getBytes(Charset.forName("UTF-8"));
+    } catch (SCMSecurityException exp) {
+      throw new IOException(exp);
+    }
   }
 
   @Override
-  public DeletedBlocksTransaction fromPersistedFormat(byte[] rawData) {
-    try {
-      return DeletedBlocksTransaction.parseFrom(rawData);
-    } catch (InvalidProtocolBufferException e) {
-      throw new IllegalArgumentException(
-          "Can't convert rawBytes to DeletedBlocksTransaction.", e);
+  public X509Certificate fromPersistedFormat(byte[] rawData)
+      throws IOException {
+    try{
+      String s = new String(rawData, Charset.forName("UTF-8"));
+      return CertificateCodec.getX509Certificate(s);
+    } catch (CertificateException exp) {
+      throw new IOException(exp);
     }
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
new file mode 100644
index 0000000..b7a2f7c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.server;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
+import org.apache.hadoop.utils.db.BatchOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Certificate Store class that persists certificates issued by SCM CA.
+ */
+public class SCMCertStore implements CertificateStore {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMCertStore.class);
+  private final SCMMetadataStore scmMetadataStore;
+  private final Lock lock;
+
+  public SCMCertStore(SCMMetadataStore dbStore) {
+    this.scmMetadataStore = dbStore;
+    lock = new ReentrantLock();
+
+  }
+
+  @Override
+  public void storeValidCertificate(BigInteger serialID,
+                                    X509Certificate certificate)
+      throws IOException {
+    lock.lock();
+    try {
+      // This makes sure that no certificate IDs are reusable.
+      if ((getCertificateByID(serialID, CertType.VALID_CERTS) == null) &&
+          (getCertificateByID(serialID, CertType.REVOKED_CERTS) == null)) {
+        scmMetadataStore.getValidCertsTable().put(serialID, certificate);
+      } else {
+        throw new SCMSecurityException("Conflicting certificate ID");
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void revokeCertificate(BigInteger serialID) throws IOException {
+    lock.lock();
+    try {
+      X509Certificate cert = getCertificateByID(serialID, CertType.VALID_CERTS);
+      if (cert == null) {
+        LOG.error("trying to revoke a certificate that is not valid. Serial: " +
+            "{}", serialID.toString());
+        throw new SCMSecurityException("Trying to revoke an invalid " +
+            "certificate.");
+      }
+      // TODO : Check if we are trying to revoke an expired certificate.
+
+      if (getCertificateByID(serialID, CertType.REVOKED_CERTS) != null) {
+        LOG.error("Trying to revoke a certificate that is already revoked.");
+        throw new SCMSecurityException("Trying to revoke an already revoked " +
+            "certificate.");
+      }
+
+      // let is do this in a transaction.
+      try (BatchOperation batch =
+               scmMetadataStore.getStore().initBatchOperation();) {
+        scmMetadataStore.getRevokedCertsTable()
+            .putWithBatch(batch, serialID, cert);
+        scmMetadataStore.getValidCertsTable().deleteWithBatch(batch, serialID);
+        scmMetadataStore.getStore().commitBatchOperation(batch);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void removeExpiredCertificate(BigInteger serialID)
+      throws IOException {
+    // TODO: Later this allows removal of expired certificates from the system.
+  }
+
+  @Override
+  public X509Certificate getCertificateByID(BigInteger serialID,
+                                            CertType certType)
+      throws IOException {
+    if (certType == CertType.VALID_CERTS) {
+      return scmMetadataStore.getValidCertsTable().get(serialID);
+    } else {
+      return scmMetadataStore.getRevokedCertsTable().get(serialID);
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 3221128..9bf47e6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -254,8 +254,16 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
      * passed before any artifacts like SCM DB is created. So please don't
      * add any other initialization above the Security checks please.
      */
+    if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+      loginAsSCMUser(conf);
+    }
+
+    // Creates the SCM DBs or opens them if it exists.
+    // A valid pointer to the store is required by all the other services below.
+    initalizeMetadataStore(conf, configurator);
 
-    // Authenticate SCM if security is enabled
+    // Authenticate SCM if security is enabled, this initialization can only
+    // be done after the metadata store is initialized.
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
       initializeCAnSecurityProtocol(conf, configurator);
     } else {
@@ -266,8 +274,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       securityProtocolServer = null;
     }
 
-    // Creates the SCM DBs or opens them if it exists.
-    initalizeMetadataStore(conf, configurator);
 
     eventQueue = new EventQueue();
     long watcherTimeout =
@@ -438,11 +444,11 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    */
   private void initializeCAnSecurityProtocol(OzoneConfiguration conf,
                                              SCMConfigurator configurator)
-      throws IOException, AuthenticationException {
-    loginAsSCMUser(conf);
+      throws IOException {
     if(configurator.getCertificateServer() != null) {
       this.certificateServer = configurator.getCertificateServer();
     } else {
+      // This assumes that SCM init has run, and DB metadata stores are created.
       certificateServer = initializeCertificateServer(
           getScmStorageConfig().getClusterID(),
           getScmStorageConfig().getScmId());
@@ -515,7 +521,14 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     // TODO: Support Certificate Server loading via Class Name loader.
     // So it is easy to use different Certificate Servers if needed.
     String subject = "scm@" + InetAddress.getLocalHost().getHostName();
-    return new DefaultCAServer(subject, clusterID, scmID);
+    if(this.scmMetadataStore == null) {
+      LOG.error("Cannot initialize Certificate Server without a valid meta " +
+          "data layer.");
+      throw new SCMException("Cannot initialize CA without a valid metadata " +
+          "store", ResultCodes.SCM_NOT_INITIALIZED);
+    }
+    SCMCertStore certStore = new SCMCertStore(this.scmMetadataStore);
+    return new DefaultCAServer(subject, clusterID, scmID, certStore);
   }
 
   /**
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmBucketInfoCodec.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmBucketInfoCodec.java
index ba42193..c8a211e 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmBucketInfoCodec.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmBucketInfoCodec.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.om.codec;
 
+import java.io.IOException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo;
 import org.apache.hadoop.utils.db.Codec;
@@ -30,14 +31,14 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class OmBucketInfoCodec implements Codec<OmBucketInfo> {
 
   @Override
-  public byte[] toPersistedFormat(OmBucketInfo object) {
+  public byte[] toPersistedFormat(OmBucketInfo object) throws IOException {
     Preconditions
         .checkNotNull(object, "Null object can't be converted to byte array.");
     return object.getProtobuf().toByteArray();
   }
 
   @Override
-  public OmBucketInfo fromPersistedFormat(byte[] rawData) {
+  public OmBucketInfo fromPersistedFormat(byte[] rawData) throws IOException {
     Preconditions
         .checkNotNull(rawData,
             "Null byte array can't converted to real object.");
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmKeyInfoCodec.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmKeyInfoCodec.java
index e71085d..7475ced 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmKeyInfoCodec.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmKeyInfoCodec.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.om.codec;
 
+import java.io.IOException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
 import org.apache.hadoop.utils.db.Codec;
@@ -30,14 +31,14 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class OmKeyInfoCodec implements Codec<OmKeyInfo> {
 
   @Override
-  public byte[] toPersistedFormat(OmKeyInfo object) {
+  public byte[] toPersistedFormat(OmKeyInfo object) throws IOException {
     Preconditions
         .checkNotNull(object, "Null object can't be converted to byte array.");
     return object.getProtobuf().toByteArray();
   }
 
   @Override
-  public OmKeyInfo fromPersistedFormat(byte[] rawData) {
+  public OmKeyInfo fromPersistedFormat(byte[] rawData) throws IOException {
     Preconditions
         .checkNotNull(rawData,
             "Null byte array can't converted to real object.");
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmMultipartKeyInfoCodec.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmMultipartKeyInfoCodec.java
index 5071bfd..ce37037 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmMultipartKeyInfoCodec.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmMultipartKeyInfoCodec.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om.codec;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.utils.db.Codec;
@@ -30,7 +31,8 @@ import org.apache.hadoop.utils.db.Codec;
 public class OmMultipartKeyInfoCodec implements Codec<OmMultipartKeyInfo> {
 
   @Override
-  public byte[] toPersistedFormat(OmMultipartKeyInfo object) {
+  public byte[] toPersistedFormat(OmMultipartKeyInfo object)
+      throws IOException {
     Preconditions.checkNotNull(object,
         "Null object can't be converted to byte array.");
     return object.getProto().toByteArray();
@@ -42,7 +44,8 @@ public class OmMultipartKeyInfoCodec implements Codec<OmMultipartKeyInfo> {
    * Construct {@link OmMultipartKeyInfo} from byte[]. If unable to convert
    * return null.
    */
-  public OmMultipartKeyInfo fromPersistedFormat(byte[] rawData) {
+  public OmMultipartKeyInfo fromPersistedFormat(byte[] rawData)
+      throws IOException {
     Preconditions.checkNotNull(rawData,
         "Null byte array can't converted to real object.");
     try {
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmVolumeArgsCodec.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmVolumeArgsCodec.java
index fb7ce72..372557a 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmVolumeArgsCodec.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/OmVolumeArgsCodec.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.om.codec;
 
+import java.io.IOException;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
 import org.apache.hadoop.utils.db.Codec;
@@ -30,14 +31,14 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class OmVolumeArgsCodec implements Codec<OmVolumeArgs> {
 
   @Override
-  public byte[] toPersistedFormat(OmVolumeArgs object) {
+  public byte[] toPersistedFormat(OmVolumeArgs object) throws IOException {
     Preconditions
         .checkNotNull(object, "Null object can't be converted to byte array.");
     return object.getProtobuf().toByteArray();
   }
 
   @Override
-  public OmVolumeArgs fromPersistedFormat(byte[] rawData) {
+  public OmVolumeArgs fromPersistedFormat(byte[] rawData) throws IOException {
     Preconditions
         .checkNotNull(rawData,
             "Null byte array can't converted to real object.");
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/VolumeListCodec.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/VolumeListCodec.java
index dca3e47..45b1183 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/VolumeListCodec.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/VolumeListCodec.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.om.codec;
 
+import java.io.IOException;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
 import org.apache.hadoop.utils.db.Codec;
 
@@ -29,14 +30,14 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class VolumeListCodec implements Codec<VolumeList> {
 
   @Override
-  public byte[] toPersistedFormat(VolumeList object) {
+  public byte[] toPersistedFormat(VolumeList object) throws IOException {
     Preconditions
         .checkNotNull(object, "Null object can't be converted to byte array.");
     return object.toByteArray();
   }
 
   @Override
-  public VolumeList fromPersistedFormat(byte[] rawData) {
+  public VolumeList fromPersistedFormat(byte[] rawData) throws IOException {
     Preconditions
         .checkNotNull(rawData,
             "Null byte array can't converted to real object.");
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmMultipartKeyInfoCodec.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmMultipartKeyInfoCodec.java
index e524447..7a537c0 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmMultipartKeyInfoCodec.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmMultipartKeyInfoCodec.java
@@ -36,18 +36,30 @@ public class TestOmMultipartKeyInfoCodec {
     OmMultipartKeyInfoCodec codec = new OmMultipartKeyInfoCodec();
     OmMultipartKeyInfo omMultipartKeyInfo = new OmMultipartKeyInfo(UUID
         .randomUUID().toString(), new HashMap<>());
-    byte[] data = codec.toPersistedFormat(omMultipartKeyInfo);
+    byte[] data = new byte[0];
+    try {
+      data = codec.toPersistedFormat(omMultipartKeyInfo);
+    } catch (java.io.IOException e) {
+      e.printStackTrace();
+    }
     Assert.assertNotNull(data);
 
-    OmMultipartKeyInfo multipartKeyInfo = codec.fromPersistedFormat(data);
+    OmMultipartKeyInfo multipartKeyInfo = null;
+    try {
+      multipartKeyInfo = codec.fromPersistedFormat(data);
+    } catch (java.io.IOException e) {
+      e.printStackTrace();
+    }
     Assert.assertEquals(omMultipartKeyInfo, multipartKeyInfo);
 
     // When random byte data passed returns null.
     try {
-      codec.fromPersistedFormat("radom".getBytes());
+      codec.fromPersistedFormat("random".getBytes());
     } catch (IllegalArgumentException ex) {
       GenericTestUtils.assertExceptionContains("Can't encode the the raw " +
           "data from the byte array", ex);
+    } catch (java.io.IOException e) {
+      e.printStackTrace();
     }
 
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org