You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2020/07/22 13:00:38 UTC

[hadoop-ozone] 28/39: HDDS-3965. SCM failed to start up for duplicated pipeline detected. (#1210)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch ozone-0.6.0
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 6cdfc7d64755cfdf601d9da85493fecde5ed3e46
Author: avijayanhwx <14...@users.noreply.github.com>
AuthorDate: Fri Jul 17 11:51:44 2020 -0700

    HDDS-3965. SCM failed to start up for duplicated pipeline detected. (#1210)
    
    (cherry picked from commit ca4c5a154bfda6c176775138f809b17e6af1d77e)
---
 .../hadoop/hdds/utils/db/RDBStoreIterator.java     |  40 +++--
 .../hadoop/hdds/utils/db/TestRDBStoreIterator.java |  10 +-
 .../hadoop/hdds/utils/db/TestRDBTableStore.java    |  61 +++++++
 .../hdds/scm/pipeline/SCMPipelineManager.java      |   6 +
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  80 +++++++++
 ...TestSCMStoreImplWithOldPipelineIDKeyFormat.java | 180 +++++++++++++++++++++
 6 files changed, 360 insertions(+), 17 deletions(-)

diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
index 5902486..ffe5f96 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
@@ -24,6 +24,8 @@ import java.util.NoSuchElementException;
 import java.util.function.Consumer;
 
 import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * RocksDB store iterator.
@@ -31,12 +33,16 @@ import org.rocksdb.RocksIterator;
 public class RDBStoreIterator
     implements TableIterator<byte[], ByteArrayKeyValue> {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RDBStoreIterator.class);
+
   private RocksIterator rocksDBIterator;
   private RDBTable rocksDBTable;
+  private ByteArrayKeyValue currentEntry;
 
   public RDBStoreIterator(RocksIterator iterator) {
     this.rocksDBIterator = iterator;
-    rocksDBIterator.seekToFirst();
+    seekToFirst();
   }
 
   public RDBStoreIterator(RocksIterator iterator, RDBTable table) {
@@ -52,6 +58,15 @@ public class RDBStoreIterator
     }
   }
 
+  private void setCurrentEntry() {
+    if (rocksDBIterator.isValid()) {
+      currentEntry = ByteArrayKeyValue.create(rocksDBIterator.key(),
+          rocksDBIterator.value());
+    } else {
+      currentEntry = null;
+    }
+  }
+
   @Override
   public boolean hasNext() {
     return rocksDBIterator.isValid();
@@ -59,12 +74,10 @@ public class RDBStoreIterator
 
   @Override
   public ByteArrayKeyValue next() {
-    if (rocksDBIterator.isValid()) {
-      ByteArrayKeyValue value =
-          ByteArrayKeyValue.create(rocksDBIterator.key(), rocksDBIterator
-              .value());
+    setCurrentEntry();
+    if (currentEntry != null) {
       rocksDBIterator.next();
-      return value;
+      return currentEntry;
     }
     throw new NoSuchElementException("RocksDB Store has no more elements");
   }
@@ -72,21 +85,20 @@ public class RDBStoreIterator
   @Override
   public void seekToFirst() {
     rocksDBIterator.seekToFirst();
+    setCurrentEntry();
   }
 
   @Override
   public void seekToLast() {
     rocksDBIterator.seekToLast();
+    setCurrentEntry();
   }
 
   @Override
   public ByteArrayKeyValue seek(byte[] key) {
     rocksDBIterator.seek(key);
-    if (rocksDBIterator.isValid()) {
-      return ByteArrayKeyValue.create(rocksDBIterator.key(),
-          rocksDBIterator.value());
-    }
-    return null;
+    setCurrentEntry();
+    return currentEntry;
   }
 
   @Override
@@ -111,8 +123,10 @@ public class RDBStoreIterator
     if (rocksDBTable == null) {
       throw new UnsupportedOperationException("remove");
     }
-    if (rocksDBIterator.isValid()) {
-      rocksDBTable.delete(rocksDBIterator.key());
+    if (currentEntry != null) {
+      rocksDBTable.delete(currentEntry.getKey());
+    } else {
+      LOG.info("Unable to delete currentEntry as it does not exist.");
     }
   }
 
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java
index 6e85977..fcb7dd2 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java
@@ -58,12 +58,14 @@ public class TestRDBStoreIterator {
   @Test
   public void testForeachRemainingCallsConsumerWithAllElements() {
     when(rocksDBIteratorMock.isValid())
-        .thenReturn(true, true, true, true, true, true, false);
+        .thenReturn(true, true, true, true, true, true, true, false);
     when(rocksDBIteratorMock.key())
-        .thenReturn(new byte[]{0x00}, new byte[]{0x01}, new byte[]{0x02})
+        .thenReturn(new byte[]{0x00}, new byte[]{0x00}, new byte[]{0x01},
+            new byte[]{0x02})
         .thenThrow(new NoSuchElementException());
     when(rocksDBIteratorMock.value())
-        .thenReturn(new byte[]{0x7f}, new byte[]{0x7e}, new byte[]{0x7d})
+        .thenReturn(new byte[]{0x7f}, new byte[]{0x7f}, new byte[]{0x7e},
+            new byte[]{0x7d})
         .thenThrow(new NoSuchElementException());
 
 
@@ -91,7 +93,7 @@ public class TestRDBStoreIterator {
 
   @Test
   public void testHasNextDependsOnIsvalid(){
-    when(rocksDBIteratorMock.isValid()).thenReturn(true, false);
+    when(rocksDBIteratorMock.isValid()).thenReturn(true, true, false);
 
     RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
 
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
index 00d05a1..5d00763 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
@@ -364,4 +364,65 @@ public class TestRDBTableStore {
       Assert.assertTrue(keyCount > 0 && keyCount <= numKeys);
     }
   }
+
+  @Test
+  public void testIteratorRemoveFromDB() throws Exception {
+
+    // Remove without next removes first entry.
+    try (Table<byte[], byte[]> testTable = rdbStore.getTable("Fifth")) {
+      writeToTable(testTable, 3);
+      TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iterator =
+          testTable.iterator();
+      iterator.removeFromDB();
+      Assert.assertNull(testTable.get("1".getBytes(StandardCharsets.UTF_8)));
+      Assert.assertNotNull(testTable.get("2".getBytes(StandardCharsets.UTF_8)));
+      Assert.assertNotNull(testTable.get("3".getBytes(StandardCharsets.UTF_8)));
+    }
+
+    // Remove after seekToLast removes lastEntry
+    try (Table<byte[], byte[]> testTable = rdbStore.getTable("Sixth")) {
+      writeToTable(testTable, 3);
+      TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iterator =
+          testTable.iterator();
+      iterator.seekToLast();
+      iterator.removeFromDB();
+      Assert.assertNotNull(testTable.get("1".getBytes(StandardCharsets.UTF_8)));
+      Assert.assertNotNull(testTable.get("2".getBytes(StandardCharsets.UTF_8)));
+      Assert.assertNull(testTable.get("3".getBytes(StandardCharsets.UTF_8)));
+    }
+
+    // Remove after seek deletes that entry.
+    try (Table<byte[], byte[]> testTable = rdbStore.getTable("Sixth")) {
+      writeToTable(testTable, 3);
+      TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iterator =
+          testTable.iterator();
+      iterator.seek("3".getBytes(StandardCharsets.UTF_8));
+      iterator.removeFromDB();
+      Assert.assertNotNull(testTable.get("1".getBytes(StandardCharsets.UTF_8)));
+      Assert.assertNotNull(testTable.get("2".getBytes(StandardCharsets.UTF_8)));
+      Assert.assertNull(testTable.get("3".getBytes(StandardCharsets.UTF_8)));
+    }
+
+    // Remove after next() deletes entry that was returned by next.
+    try (Table<byte[], byte[]> testTable = rdbStore.getTable("Sixth")) {
+      writeToTable(testTable, 3);
+      TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iterator =
+          testTable.iterator();
+      iterator.seek("2".getBytes(StandardCharsets.UTF_8));
+      iterator.next();
+      iterator.removeFromDB();
+      Assert.assertNotNull(testTable.get("1".getBytes(StandardCharsets.UTF_8)));
+      Assert.assertNull(testTable.get("2".getBytes(StandardCharsets.UTF_8)));
+      Assert.assertNotNull(testTable.get("3".getBytes(StandardCharsets.UTF_8)));
+    }
+  }
+
+  private void writeToTable(Table testTable, int num) throws IOException {
+    for (int i = 1; i <= num; i++) {
+      byte[] key = (i + "").getBytes(StandardCharsets.UTF_8);
+      byte[] value =
+          RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
+      testTable.put(key, value);
+    }
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index fda9371..6fce895 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -209,6 +209,7 @@ public class SCMPipelineManager implements PipelineManager {
   ) {
     if (!pipelineID.equals(pipeline.getId())) {
       try {
+        LOG.info("Found pipeline in old format key : {}", pipeline.getId());
         it.removeFromDB();
         pipelineStore.put(pipeline.getId(), pipeline);
       } catch (IOException e) {
@@ -701,4 +702,9 @@ public class SCMPipelineManager implements PipelineManager {
       startPipelineCreator();
     }
   }
+
+  @VisibleForTesting
+  protected static Logger getLog() {
+    return LOG;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index fc8f61a..62289b9 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -22,10 +22,13 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -56,12 +59,15 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_L
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.junit.After;
 import org.junit.Assert;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.mockito.InOrder;
 
 import static org.mockito.Mockito.doReturn;
@@ -617,6 +623,80 @@ public class TestSCMPipelineManager {
     verify(pipelineStore, never()).put(p2.getId(), p2);
   }
 
+  @Test
+  public void testScmWithPipelineDBKeyFormatChange() throws Exception {
+    TemporaryFolder tempDir = new TemporaryFolder();
+    tempDir.create();
+    File dir = tempDir.newFolder();
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.getAbsolutePath());
+
+    SCMMetadataStore scmDbWithOldKeyFormat = null;
+    Map<UUID, Pipeline> oldPipelines = new HashMap<>();
+    try {
+      scmDbWithOldKeyFormat =
+          new TestSCMStoreImplWithOldPipelineIDKeyFormat(conf);
+      // Create 3 pipelines.
+      for (int i = 0; i < 3; i++) {
+        Pipeline pipeline = pipelineStub();
+        scmDbWithOldKeyFormat.getPipelineTable()
+            .put(pipeline.getId(), pipeline);
+        oldPipelines.put(pipeline.getId().getId(), pipeline);
+      }
+    } finally {
+      if (scmDbWithOldKeyFormat != null) {
+        scmDbWithOldKeyFormat.stop();
+      }
+    }
+
+    LogCapturer logCapturer =
+        LogCapturer.captureLogs(SCMPipelineManager.getLog());
+
+    // Create SCMPipelineManager with new DBDefinition.
+    SCMMetadataStore newScmMetadataStore = null;
+    try {
+      newScmMetadataStore = new SCMMetadataStoreImpl(conf);
+      SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
+          nodeManager,
+          newScmMetadataStore.getPipelineTable(),
+          new EventQueue());
+
+      waitForLog(logCapturer);
+      assertEquals(3, pipelineManager.getPipelines().size());
+      oldPipelines.values().forEach(p ->
+          pipelineManager.containsPipeline(p.getId()));
+    } finally {
+      newScmMetadataStore.stop();
+    }
+
+    // Mimicking another restart.
+    try {
+      logCapturer.clearOutput();
+      newScmMetadataStore = new SCMMetadataStoreImpl(conf);
+      SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
+          nodeManager,
+          newScmMetadataStore.getPipelineTable(),
+          new EventQueue());
+      try {
+        waitForLog(logCapturer);
+        Assert.fail("Unexpected log: " + logCapturer.getOutput());
+      } catch (TimeoutException ex) {
+        Assert.assertTrue(ex.getMessage().contains("Timed out"));
+      }
+      assertEquals(3, pipelineManager.getPipelines().size());
+      oldPipelines.values().forEach(p ->
+          pipelineManager.containsPipeline(p.getId()));
+    } finally {
+      newScmMetadataStore.stop();
+    }
+  }
+
+  private static void waitForLog(LogCapturer logCapturer)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(() -> logCapturer.getOutput()
+            .contains("Found pipeline in old format key"),
+        1000, 5000);
+  }
+
   private Pipeline pipelineStub() {
     return Pipeline.newBuilder()
         .setId(PipelineID.randomId())
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
new file mode 100644
index 0000000..a04ecea
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.cert.X509Certificate;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.metadata.PipelineCodec;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
+import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+
+/**
+ * Test SCM Metadata Store that has ONLY the pipeline table whose key uses the
+ * old codec format.
+ */
+public class TestSCMStoreImplWithOldPipelineIDKeyFormat
+    implements SCMMetadataStore {
+
+  private DBStore store;
+  private final OzoneConfiguration configuration;
+  private Table<PipelineID, Pipeline> pipelineTable;
+
+  public TestSCMStoreImplWithOldPipelineIDKeyFormat(
+      OzoneConfiguration config) throws IOException {
+    this.configuration = config;
+    start(configuration);
+  }
+
+  @Override
+  public void start(OzoneConfiguration config)
+      throws IOException {
+    if (this.store == null) {
+      this.store = DBStoreBuilder.createDBStore(config,
+          new SCMDBTestDefinition());
+      pipelineTable = PIPELINES.getTable(store);
+    }
+  }
+
+  @Override
+  public void stop() throws Exception {
+    if (store != null) {
+      store.close();
+      store = null;
+    }
+  }
+
+  @Override
+  public DBStore getStore() {
+    return null;
+  }
+
+  @Override
+  public Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable() {
+    return null;
+  }
+
+  @Override
+  public Long getCurrentTXID() {
+    return null;
+  }
+
+  @Override
+  public Long getNextDeleteBlockTXID() {
+    return null;
+  }
+
+  @Override
+  public Table<BigInteger, X509Certificate> getValidCertsTable() {
+    return null;
+  }
+
+  @Override
+  public Table<BigInteger, X509Certificate> getRevokedCertsTable() {
+    return null;
+  }
+
+  @Override
+  public TableIterator getAllCerts(CertificateStore.CertType certType) {
+    return null;
+  }
+
+  @Override
+  public Table<PipelineID, Pipeline> getPipelineTable() {
+    return pipelineTable;
+  }
+
+  @Override
+  public BatchOperationHandler getBatchHandler() {
+    return null;
+  }
+
+  @Override
+  public Table<ContainerID, ContainerInfo> getContainerTable() {
+    return null;
+  }
+
+  /**
+   * Test SCM DB Definition for the above class.
+   */
+  public static class SCMDBTestDefinition implements DBDefinition {
+
+    public static final DBColumnFamilyDefinition<PipelineID, Pipeline>
+        PIPELINES =
+        new DBColumnFamilyDefinition<>(
+            "pipelines",
+            PipelineID.class,
+            new OldPipelineIDCodec(),
+            Pipeline.class,
+            new PipelineCodec());
+
+    @Override
+    public String getName() {
+      return "scm.db";
+    }
+
+    @Override
+    public String getLocationConfigKey() {
+      return ScmConfigKeys.OZONE_SCM_DB_DIRS;
+    }
+
+    @Override
+    public DBColumnFamilyDefinition[] getColumnFamilies() {
+      return new DBColumnFamilyDefinition[] {PIPELINES};
+    }
+  }
+
+  /**
+   * Old Pipeline ID codec that relies on protobuf serialization.
+   */
+  public static class OldPipelineIDCodec implements Codec<PipelineID> {
+    @Override
+    public byte[] toPersistedFormat(PipelineID object) throws IOException {
+      return object.getProtobuf().toByteArray();
+    }
+
+    @Override
+    public PipelineID fromPersistedFormat(byte[] rawData) throws IOException {
+      return null;
+    }
+
+    @Override
+    public PipelineID copyObject(PipelineID object) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org