You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/01/20 17:54:41 UTC
[ozone] branch HDDS-2823 updated: HDDS-3208. Implement Ratis
snapshot on SCM (#1725)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 3ff677d HDDS-3208. Implement Ratis snapshot on SCM (#1725)
3ff677d is described below
commit 3ff677d9400fe3c033e011a8a3bbc91cfb13684f
Author: Rui Wang <am...@163.com>
AuthorDate: Wed Jan 20 09:54:24 2021 -0800
HDDS-3208. Implement Ratis snapshot on SCM (#1725)
---
.../hdds/scm/container/ContainerManagerImpl.java | 9 +-
.../scm/container/ContainerStateManagerImpl.java | 40 ++++--
...{SCMHAManager.java => DBTransactionBuffer.java} | 38 +++---
.../hdds/scm/ha/MockDBTransactionBuffer.java | 82 ++++++++++++
.../hadoop/hdds/scm/ha/MockSCMHAManager.java | 17 ++-
.../hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java | 112 ++++++++++++++++
.../apache/hadoop/hdds/scm/ha/SCMHAManager.java | 5 +
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 13 +-
.../hadoop/hdds/scm/ha/SCMRatisServerImpl.java | 7 +-
.../hadoop/hdds/scm/ha/SCMRatisSnapshotInfo.java | 71 ++++++++++
.../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 64 ++++++++-
.../hadoop/hdds/scm/ha/SCMTransactionInfo.java | 145 +++++++++++++++++++++
.../hadoop/hdds/scm/metadata/SCMDBDefinition.java | 13 +-
.../hadoop/hdds/scm/metadata/SCMMetadataStore.java | 6 +
.../hdds/scm/metadata/SCMMetadataStoreImpl.java | 13 ++
.../hdds/scm/metadata/SCMTransactionInfoCodec.java | 48 +++++++
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 1 +
.../scm/pipeline/PipelineStateManagerV2Impl.java | 25 +++-
.../hdds/scm/server/StorageContainerManager.java | 14 +-
.../scm/metadata/TestSCMTransactionInfoCodec.java | 78 +++++++++++
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 35 ++++-
...TestSCMStoreImplWithOldPipelineIDKeyFormat.java | 6 +
.../apache/hadoop/hdds/scm/TestSCMSnapshot.java | 109 ++++++++++++++++
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 1 +
.../ozone/recon/scm/ReconContainerManager.java | 7 +-
.../scm/ReconStorageContainerManagerFacade.java | 1 +
.../scm/AbstractReconContainerManagerTest.java | 1 +
.../ozone/recon/scm/TestReconContainerManager.java | 5 +
28 files changed, 902 insertions(+), 64 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index 91684ce..c0ace46 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -110,6 +111,7 @@ public class ContainerManagerImpl implements ContainerManagerV2 {
.setPipelineManager(pipelineManager)
.setRatisServer(scmHaManager.getRatisServer())
.setContainerStore(containerStore)
+ .setSCMDBTransactionBuffer(scmHaManager.getDBTransactionBuffer())
.build();
this.numContainerPerVolume = conf
@@ -117,8 +119,6 @@ public class ContainerManagerImpl implements ContainerManagerV2 {
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
-
-
}
@Override
@@ -397,4 +397,9 @@ public class ContainerManagerImpl implements ContainerManagerV2 {
protected ContainerStateManagerV2 getContainerStateManager() {
return containerStateManager;
}
+
+ @VisibleForTesting
+ public SCMHAManager getSCMHAManager() {
+ return haManager;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
index 68e4d35..c26afef 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.ha.CheckedConsumer;
+import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
import org.apache.hadoop.hdds.scm.ha.ExecutionUtil;
import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
@@ -99,7 +100,9 @@ public final class ContainerStateManagerImpl
/**
* Persistent store for Container States.
*/
- private final Table<ContainerID, ContainerInfo> containerStore;
+ private Table<ContainerID, ContainerInfo> containerStore;
+
+ private final DBTransactionBuffer transactionBuffer;
/**
* PipelineManager instance.
@@ -135,7 +138,8 @@ public final class ContainerStateManagerImpl
*/
private ContainerStateManagerImpl(final Configuration conf,
final PipelineManager pipelineManager,
- final Table<ContainerID, ContainerInfo> containerStore)
+ final Table<ContainerID, ContainerInfo> containerStore,
+ final DBTransactionBuffer buffer)
throws IOException {
this.pipelineManager = pipelineManager;
this.containerStore = containerStore;
@@ -144,6 +148,7 @@ public final class ContainerStateManagerImpl
this.containers = new ContainerStateMap();
this.lastUsedMap = new ConcurrentHashMap<>();
this.containerStateChangeActions = getContainerStateChangeActions();
+ this.transactionBuffer = buffer;
initialize();
}
@@ -298,12 +303,16 @@ public final class ContainerStateManagerImpl
try {
if (!containers.contains(containerID)) {
ExecutionUtil.create(() -> {
- containerStore.put(containerID, container);
+ containerStore.putWithBatch(
+ transactionBuffer.getCurrentBatchOperation(),
+ containerID, container);
containers.addContainer(container);
pipelineManager.addContainerToPipeline(pipelineID, containerID);
}).onException(() -> {
containers.removeContainer(containerID);
- containerStore.delete(containerID);
+ containerStore.deleteWithBatch(
+ transactionBuffer.getCurrentBatchOperation(),
+ containerID);
}).execute();
}
} finally {
@@ -339,13 +348,17 @@ public final class ContainerStateManagerImpl
if (newState.getNumber() > oldState.getNumber()) {
ExecutionUtil.create(() -> {
containers.updateState(id, oldState, newState);
- containerStore.put(id, containers.getContainerInfo(id));
+ containerStore.putWithBatch(
+ transactionBuffer.getCurrentBatchOperation(),
+ id, containers.getContainerInfo(id));
}).onException(() -> {
- containerStore.put(id, oldInfo);
+ containerStore.putWithBatch(
+ transactionBuffer.getCurrentBatchOperation(),
+ id, oldInfo);
containers.updateState(id, newState, oldState);
}).execute();
- containerStateChangeActions.getOrDefault(event, info -> {})
- .execute(oldInfo);
+ containerStateChangeActions.getOrDefault(event, info -> {
+ }).execute(oldInfo);
}
}
} finally {
@@ -475,7 +488,9 @@ public final class ContainerStateManagerImpl
final ContainerID cid = ContainerID.getFromProtobuf(id);
final ContainerInfo containerInfo = containers.getContainerInfo(cid);
ExecutionUtil.create(() -> {
- containerStore.delete(cid);
+ containerStore.deleteWithBatch(
+ transactionBuffer.getCurrentBatchOperation(),
+ cid);
containers.removeContainer(cid);
}).onException(() -> containerStore.put(cid, containerInfo)).execute();
} finally {
@@ -504,7 +519,12 @@ public final class ContainerStateManagerImpl
private PipelineManager pipelineMgr;
private SCMRatisServer scmRatisServer;
private Table<ContainerID, ContainerInfo> table;
+ private DBTransactionBuffer transactionBuffer;
+ public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) {
+ this.transactionBuffer = buffer;
+ return this;
+ }
public Builder setConfiguration(final Configuration config) {
conf = config;
return this;
@@ -533,7 +553,7 @@ public final class ContainerStateManagerImpl
Preconditions.checkNotNull(table);
final ContainerStateManagerV2 csm = new ContainerStateManagerImpl(
- conf, pipelineMgr, table);
+ conf, pipelineMgr, table, transactionBuffer);
final SCMHAInvocationHandler invocationHandler =
new SCMHAInvocationHandler(RequestType.CONTAINER, csm,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/DBTransactionBuffer.java
similarity index 56%
copy from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
copy to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/DBTransactionBuffer.java
index 8fe6d7f..ae74939 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/DBTransactionBuffer.java
@@ -14,28 +14,30 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.hadoop.hdds.scm.ha;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.ratis.statemachine.SnapshotInfo;
+
+import java.io.Closeable;
import java.io.IOException;
/**
- * SCMHAManager provides HA service for SCM.
+ * DB transaction that buffers SCM DB transactions. Call the flush method
+ * to flush a batch into SCM DB. This buffer also maintains a latest transaction
+ * info to indicate the information of the latest transaction in the buffer.
*/
-public interface SCMHAManager {
-
- /**
- * Starts HA service.
- */
- void start() throws IOException;
-
- /**
- * Returns RatisServer instance associated with the SCM instance.
- */
- SCMRatisServer getRatisServer();
-
- /**
- * Stops the HA service.
- */
- void shutdown() throws IOException;
+public interface DBTransactionBuffer extends Closeable {
+
+ BatchOperation getCurrentBatchOperation();
+
+ void updateLatestTrxInfo(SCMTransactionInfo info);
+
+ SCMTransactionInfo getLatestTrxInfo();
+
+ SnapshotInfo getLatestSnapshot();
+
+ void setLatestSnapshot(SnapshotInfo latestSnapshot);
+
+ void flush() throws IOException;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockDBTransactionBuffer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockDBTransactionBuffer.java
new file mode 100644
index 0000000..2a1d615
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockDBTransactionBuffer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
+import org.apache.ratis.statemachine.SnapshotInfo;
+
+import java.io.IOException;
+
+public class MockDBTransactionBuffer implements DBTransactionBuffer {
+ private DBStore dbStore;
+ private BatchOperation currentBatchOperation;
+
+ public MockDBTransactionBuffer() {
+ }
+
+ public MockDBTransactionBuffer(DBStore store) {
+ this.dbStore = store;
+ }
+
+ @Override
+ public BatchOperation getCurrentBatchOperation() {
+ if (currentBatchOperation == null) {
+ if (dbStore != null) {
+ currentBatchOperation = dbStore.initBatchOperation();
+ } else {
+ currentBatchOperation = new RDBBatchOperation();
+ }
+ }
+ return currentBatchOperation;
+ }
+
+ @Override
+ public void updateLatestTrxInfo(SCMTransactionInfo info) {
+
+ }
+
+ @Override
+ public SCMTransactionInfo getLatestTrxInfo() {
+ return null;
+ }
+
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ return null;
+ }
+
+ @Override
+ public void setLatestSnapshot(SnapshotInfo latestSnapshot) {
+
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (dbStore != null) {
+ dbStore.commitBatchOperation(currentBatchOperation);
+ currentBatchOperation.close();
+ currentBatchOperation = null;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index bf25ad5..116994e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -16,7 +16,6 @@
*/
package org.apache.hadoop.hdds.scm.ha;
-
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
@@ -45,17 +44,28 @@ public final class MockSCMHAManager implements SCMHAManager {
private final SCMRatisServer ratisServer;
private boolean isLeader;
+ private DBTransactionBuffer transactionBuffer;
public static SCMHAManager getInstance(boolean isLeader) {
return new MockSCMHAManager(isLeader);
}
+ public static SCMHAManager getInstance(boolean isLeader,
+ DBTransactionBuffer buffer) {
+ return new MockSCMHAManager(isLeader, buffer);
+ }
+
/**
* Creates MockSCMHAManager instance.
*/
private MockSCMHAManager(boolean isLeader) {
+ this(isLeader, new MockDBTransactionBuffer());
+ }
+
+ private MockSCMHAManager(boolean isLeader, DBTransactionBuffer buffer) {
this.ratisServer = new MockRatisServer();
this.isLeader = isLeader;
+ this.transactionBuffer = buffer;
}
@Override
@@ -82,6 +92,11 @@ public final class MockSCMHAManager implements SCMHAManager {
return ratisServer;
}
+ @Override
+ public DBTransactionBuffer getDBTransactionBuffer() {
+ return transactionBuffer;
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java
new file mode 100644
index 0000000..e5af076
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.ratis.statemachine.SnapshotInfo;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+
+/**
+ * This is a transaction buffer that buffers SCM DB operations for Pipeline and
+ * Container. When flush this buffer to DB, a transaction info will also be
+ * written into DB to indicate the term and transaction index for the latest
+ * operation in DB.
+ */
+public class SCMDBTransactionBuffer implements DBTransactionBuffer {
+ private final SCMMetadataStore metadataStore;
+ private BatchOperation currentBatchOperation;
+ private SCMTransactionInfo latestTrxInfo;
+ private SnapshotInfo latestSnapshot;
+
+ public SCMDBTransactionBuffer(SCMMetadataStore store) throws IOException {
+ this.metadataStore = store;
+
+ // initialize a batch operation during construction time
+ currentBatchOperation = this.metadataStore.getStore().initBatchOperation();
+ latestTrxInfo = store.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+ if (latestTrxInfo == null) {
+ // transaction table is empty
+ latestTrxInfo =
+ SCMTransactionInfo
+ .builder()
+ .setTransactionIndex(-1)
+ .setCurrentTerm(0)
+ .build();
+ }
+ latestSnapshot = latestTrxInfo.toSnapshotInfo();
+ }
+
+ @Override
+ public BatchOperation getCurrentBatchOperation() {
+ return currentBatchOperation;
+ }
+
+ @Override
+ public void updateLatestTrxInfo(SCMTransactionInfo info) {
+ if (info.compareTo(this.latestTrxInfo) <= 0) {
+ throw new IllegalArgumentException(
+ "Updating DB buffer transaction info by an older transaction info, "
+ + "current: " + this.latestTrxInfo + ", updating to: " + info);
+ }
+ this.latestTrxInfo = info;
+ }
+
+ @Override
+ public SCMTransactionInfo getLatestTrxInfo() {
+ return this.latestTrxInfo;
+ }
+
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ return latestSnapshot;
+ }
+
+ @Override
+ public void setLatestSnapshot(SnapshotInfo latestSnapshot) {
+ this.latestSnapshot = latestSnapshot;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ // write latest trx info into trx table in the same batch
+ Table<String, SCMTransactionInfo> transactionInfoTable
+ = metadataStore.getTransactionInfoTable();
+ transactionInfoTable.putWithBatch(currentBatchOperation,
+ TRANSACTION_INFO_KEY, latestTrxInfo);
+
+ metadataStore.getStore().commitBatchOperation(currentBatchOperation);
+ currentBatchOperation.close();
+ this.latestSnapshot = latestTrxInfo.toSnapshotInfo();
+ // reset batch operation
+ currentBatchOperation = metadataStore.getStore().initBatchOperation();
+ }
+
+ @Override
+ public String toString() {
+ return latestTrxInfo.toString();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index 8fe6d7f..c8b3ff7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -35,6 +35,11 @@ public interface SCMHAManager {
SCMRatisServer getRatisServer();
/**
+ * Returns DB transaction buffer.
+ */
+ DBTransactionBuffer getDBTransactionBuffer();
+
+ /**
* Stops the HA service.
*/
void shutdown() throws IOException;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index db5e937..b795380 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -39,16 +39,18 @@ public class SCMHAManagerImpl implements SCMHAManager {
private final SCMRatisServer ratisServer;
private final ConfigurationSource conf;
+ private final SCMDBTransactionBuffer transactionBuffer;
/**
* Creates SCMHAManager instance.
*/
public SCMHAManagerImpl(final ConfigurationSource conf,
- final StorageContainerManager scm)
- throws IOException {
+ final StorageContainerManager scm) throws IOException {
this.conf = conf;
+ this.transactionBuffer =
+ new SCMDBTransactionBuffer(scm.getScmMetadataStore());
this.ratisServer = new SCMRatisServerImpl(
- conf.getObject(SCMHAConfiguration.class), conf, scm);
+ conf.getObject(SCMHAConfiguration.class), conf, scm, transactionBuffer);
}
/**
@@ -63,6 +65,11 @@ public class SCMHAManagerImpl implements SCMHAManager {
return ratisServer;
}
+ @Override
+ public DBTransactionBuffer getDBTransactionBuffer() {
+ return transactionBuffer;
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 3a453e3..15b2fe1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -64,9 +64,8 @@ public class SCMRatisServerImpl implements SCMRatisServer {
// TODO: Refactor and remove ConfigurationSource and use only
// SCMHAConfiguration.
SCMRatisServerImpl(final SCMHAConfiguration haConf,
- final ConfigurationSource conf,
- final StorageContainerManager scm)
- throws IOException {
+ final ConfigurationSource conf, final StorageContainerManager scm,
+ final DBTransactionBuffer buffer) throws IOException {
this.scm = scm;
this.address = haConf.getRatisBindAddress();
@@ -79,7 +78,7 @@ public class SCMRatisServerImpl implements SCMRatisServer {
.setServerId(haGrpBuilder.getPeerId())
.setGroup(haGrpBuilder.getRaftGroup())
.setProperties(serverProperties)
- .setStateMachine(new SCMStateMachine(scm, this))
+ .setStateMachine(new SCMStateMachine(scm, this, buffer))
.build();
this.division = server.getDivision(haGrpBuilder.getRaftGroupId());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisSnapshotInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisSnapshotInfo.java
new file mode 100644
index 0000000..b2f2ed4
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisSnapshotInfo.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.statemachine.SnapshotInfo;
+
+import java.util.List;
+
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_SPLIT_KEY;
+
+/**
+ * This class captures the snapshotIndex and term of the latest snapshot in
+ * the SCM
+ * Ratis server loads the snapshotInfo during startup and updates the
+ * lastApplied index to this snapshotIndex. SCM SnapshotInfo does not contain
+ * any files. It is used only to store/ update the last applied index and term.
+ */
+public class SCMRatisSnapshotInfo implements SnapshotInfo {
+ private final long term;
+ private final long snapshotIndex;
+
+ public SCMRatisSnapshotInfo(long term, long index) {
+ this.term = term;
+ this.snapshotIndex = index;
+ }
+
+ @Override
+ public TermIndex getTermIndex() {
+ return TermIndex.valueOf(term, snapshotIndex);
+ }
+
+ @Override
+ public long getTerm() {
+ return term;
+ }
+
+ @Override
+ public long getIndex() {
+ return snapshotIndex;
+ }
+
+ @Override
+ public List<FileInfo> getFiles() {
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(term);
+ stringBuilder.append(TRANSACTION_INFO_SPLIT_KEY);
+ stringBuilder.append(snapshotIndex);
+ return stringBuilder.toString();
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 052bf4b..aa366e1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -31,6 +31,10 @@ import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -38,6 +42,8 @@ import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.SCM_NOT_INITIALIZED;
+
/**
* TODO.
*/
@@ -48,13 +54,27 @@ public class SCMStateMachine extends BaseStateMachine {
private final StorageContainerManager scm;
private final SCMRatisServer ratisServer;
private final Map<RequestType, Object> handlers;
-
+ private final DBTransactionBuffer transactionBuffer;
public SCMStateMachine(final StorageContainerManager scm,
- final SCMRatisServer ratisServer) {
+ final SCMRatisServer ratisServer, DBTransactionBuffer buffer)
+ throws SCMException {
this.scm = scm;
this.ratisServer = ratisServer;
this.handlers = new EnumMap<>(RequestType.class);
+ this.transactionBuffer = buffer;
+ SCMTransactionInfo latestTrxInfo =
+ this.transactionBuffer.getLatestTrxInfo();
+ if (!latestTrxInfo.isInitialized()) {
+ if (!updateLastAppliedTermIndex(latestTrxInfo.getTerm(),
+ latestTrxInfo.getTransactionIndex())) {
+ throw new SCMException(
+ String.format("Failed to update LastAppliedTermIndex " +
+ "in StateMachine to term:{} index:{}",
+ latestTrxInfo.getTerm(), latestTrxInfo.getTransactionIndex()
+ ), SCM_NOT_INITIALIZED);
+ }
+ }
}
public void registerHandler(RequestType type, Object handler) {
@@ -62,6 +82,11 @@ public class SCMStateMachine extends BaseStateMachine {
}
@Override
+ public SnapshotInfo getLatestSnapshot() {
+ return transactionBuffer.getLatestSnapshot();
+ }
+
+ @Override
public CompletableFuture<Message> applyTransaction(
final TransactionContext trx) {
final CompletableFuture<Message> applyTransactionFuture =
@@ -70,14 +95,17 @@ public class SCMStateMachine extends BaseStateMachine {
final SCMRatisRequest request = SCMRatisRequest.decode(
Message.valueOf(trx.getStateMachineLogEntry().getLogData()));
applyTransactionFuture.complete(process(request));
+ transactionBuffer.updateLatestTrxInfo(SCMTransactionInfo.builder()
+ .setCurrentTerm(trx.getLogEntry().getTerm())
+ .setTransactionIndex(trx.getLogEntry().getIndex())
+ .build());
} catch (Exception ex) {
applyTransactionFuture.completeExceptionally(ex);
}
return applyTransactionFuture;
}
- private Message process(final SCMRatisRequest request)
- throws Exception {
+ private Message process(final SCMRatisRequest request) throws Exception {
try {
final Object handler = handlers.get(request.getType());
@@ -93,7 +121,6 @@ public class SCMStateMachine extends BaseStateMachine {
final Object result = handler.getClass().getMethod(
request.getOperation(), argumentTypes.toArray(new Class<?>[0]))
.invoke(handler, request.getArguments());
-
return SCMRatisResponse.encode(result);
} catch (NoSuchMethodException | SecurityException ex) {
throw new InvalidProtocolBufferException(ex.getMessage());
@@ -126,4 +153,31 @@ public class SCMStateMachine extends BaseStateMachine {
LOG.info("current SCM becomes leader of term {}.", term);
scm.getScmContext().updateIsLeaderAndTerm(true, term);
}
+
+ @Override
+ public long takeSnapshot() throws IOException {
+ long startTime = Time.monotonicNow();
+ TermIndex lastTermIndex = getLastAppliedTermIndex();
+ long lastAppliedIndex = lastTermIndex.getIndex();
+ SCMTransactionInfo lastAppliedTrxInfo =
+ SCMTransactionInfo.fromTermIndex(lastTermIndex);
+ if (transactionBuffer.getLatestTrxInfo()
+ .compareTo(lastAppliedTrxInfo) < 0) {
+ transactionBuffer.updateLatestTrxInfo(
+ SCMTransactionInfo.builder()
+ .setCurrentTerm(lastTermIndex.getTerm())
+ .setTransactionIndex(lastTermIndex.getIndex())
+ .build());
+ transactionBuffer.setLatestSnapshot(
+ transactionBuffer.getLatestTrxInfo().toSnapshotInfo());
+ } else {
+ lastAppliedIndex =
+ transactionBuffer.getLatestTrxInfo().getTransactionIndex();
+ }
+
+ transactionBuffer.flush();
+ LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
+ lastAppliedIndex, Time.monotonicNow() - startTime);
+ return lastAppliedIndex;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMTransactionInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMTransactionInfo.java
new file mode 100644
index 0000000..0140839
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMTransactionInfo.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.statemachine.SnapshotInfo;
+
+import java.util.Objects;
+
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_SPLIT_KEY;
+
+/**
+ * SCMTransactionInfo saves two fields for a transaction:
+ * 1. term, of which the transaction belongs to
+ * 2. transactionIndex, which is a monotonic increasing index
+ * (e.g. Raft Log index)
+ */
+final public class SCMTransactionInfo {
+ private long term;
+ private long transactionIndex;
+
+ private SCMTransactionInfo(String transactionInfo) {
+ String[] tInfo =
+ transactionInfo.split(TRANSACTION_INFO_SPLIT_KEY);
+ Preconditions.checkState(tInfo.length == 2,
+ "Incorrect TransactionInfo value");
+
+ term = Long.parseLong(tInfo[0]);
+ transactionIndex = Long.parseLong(tInfo[1]);
+ }
+
+ private SCMTransactionInfo(long currentTerm, long transactionIndex) {
+ this.term = currentTerm;
+ this.transactionIndex = transactionIndex;
+ }
+
+ public boolean isInitialized() {
+ return transactionIndex == -1 && term == 0;
+ }
+
+ public int compareTo(SCMTransactionInfo info) {
+ if (info.getTerm() == this.getTerm()) {
+ return this.getTransactionIndex() <= info.getTransactionIndex() ? -1 : 1;
+ } else {
+ return this.getTerm() < info.getTerm() ? -1 : 1;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SCMTransactionInfo that = (SCMTransactionInfo) o;
+ return term == that.term &&
+ transactionIndex == that.transactionIndex;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(term, transactionIndex);
+ }
+
+ @Override
+ public String toString() {
+ return "[" + term + ":" + transactionIndex + "]";
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+ public long getTransactionIndex() {
+ return transactionIndex;
+ }
+
+ public static SCMTransactionInfo fromTermIndex(TermIndex termIndex) {
+ return builder().setCurrentTerm(termIndex.getTerm())
+ .setTransactionIndex(termIndex.getIndex()).build();
+ }
+
+ public SnapshotInfo toSnapshotInfo() {
+ return new SCMRatisSnapshotInfo(term, transactionIndex);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private long currentTerm = 0;
+ private long transactionIndex = -1;
+
+ public Builder setCurrentTerm(long term) {
+ this.currentTerm = term;
+ return this;
+ }
+
+ public Builder setTransactionIndex(long tIndex) {
+ this.transactionIndex = tIndex;
+ return this;
+ }
+
+ public SCMTransactionInfo build() {
+ return new SCMTransactionInfo(currentTerm, transactionIndex);
+ }
+ }
+
+ public static SCMTransactionInfo getFromByteArray(byte[] bytes) {
+ String tInfo = StringUtils.bytes2String(bytes);
+ return new SCMTransactionInfo(tInfo);
+ }
+
+ public byte[] convertToByteArray() {
+ return StringUtils.string2Bytes(generateTransactionInfo());
+ }
+
+ private String generateTransactionInfo() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(term);
+ stringBuilder.append(TRANSACTION_INFO_SPLIT_KEY);
+ stringBuilder.append(transactionIndex);
+
+ return stringBuilder.toString();
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
index fcddcdd..790b6db 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
@@ -24,11 +24,13 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
import org.apache.hadoop.hdds.utils.db.DBDefinition;
import org.apache.hadoop.hdds.utils.db.LongCodec;
+import org.apache.hadoop.hdds.utils.db.StringCodec;
/**
* Class defines the structure and types of the scm.db.
@@ -80,6 +82,15 @@ public class SCMDBDefinition implements DBDefinition {
ContainerInfo.class,
new ContainerInfoCodec());
+ public static final DBColumnFamilyDefinition<String, SCMTransactionInfo>
+ TRANSACTIONINFO =
+ new DBColumnFamilyDefinition<>(
+ "scmTransactionInfos",
+ String.class,
+ new StringCodec(),
+ SCMTransactionInfo.class,
+ new SCMTransactionInfoCodec());
+
@Override
public String getName() {
return "scm.db";
@@ -93,6 +104,6 @@ public class SCMDBDefinition implements DBDefinition {
@Override
public DBColumnFamilyDefinition[] getColumnFamilies() {
return new DBColumnFamilyDefinition[] {DELETED_BLOCKS, VALID_CERTS,
- REVOKED_CERTS, PIPELINES, CONTAINERS};
+ REVOKED_CERTS, PIPELINES, CONTAINERS, TRANSACTIONINFO};
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
index 0452c05..7ddd11d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
@@ -111,6 +112,11 @@ public interface SCMMetadataStore {
Table<PipelineID, Pipeline> getPipelineTable();
/**
+ * A Table that keeps the latest transaction index of the DB state.
+ */
+ Table <String, SCMTransactionInfo> getTransactionInfoTable();
+
+ /**
* Helper to create and write batch transactions.
*/
BatchOperationHandler getBatchHandler();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
index 0a609c7..0ac3a60 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
@@ -40,6 +41,7 @@ import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.DELETED_BLOCKS;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.TRANSACTIONINFO;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.VALID_CERTS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +62,8 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
private Table<PipelineID, Pipeline> pipelineTable;
+ private Table<String, SCMTransactionInfo> transactionInfoTable;
+
private static final Logger LOG =
LoggerFactory.getLogger(SCMMetadataStoreImpl.class);
private DBStore store;
@@ -107,6 +111,10 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
containerTable = CONTAINERS.getTable(store);
checkTableStatus(containerTable, CONTAINERS.getName());
+
+ transactionInfoTable = TRANSACTIONINFO.getTable(store);
+
+ checkTableStatus(transactionInfoTable, TRANSACTIONINFO.getName());
}
}
@@ -162,6 +170,11 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
}
@Override
+ public Table<String, SCMTransactionInfo> getTransactionInfoTable() {
+ return transactionInfoTable;
+ }
+
+ @Override
public BatchOperationHandler getBatchHandler() {
return this.store;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMTransactionInfoCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMTransactionInfoCodec.java
new file mode 100644
index 0000000..6c0e6e1
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMTransactionInfoCodec.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
+import org.apache.hadoop.hdds.utils.db.Codec;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class SCMTransactionInfoCodec implements Codec<SCMTransactionInfo> {
+
+ @Override
+ public byte[] toPersistedFormat(SCMTransactionInfo object)
+ throws IOException {
+ checkNotNull(object, "Null object can't be converted to byte array.");
+ return object.convertToByteArray();
+ }
+
+ @Override
+ public SCMTransactionInfo fromPersistedFormat(byte[] rawData)
+ throws IOException {
+ checkNotNull(rawData, "Null byte array can't be converted to " +
+ "real object.");
+ return SCMTransactionInfo.getFromByteArray(rawData);
+ }
+
+ @Override
+ public SCMTransactionInfo copyObject(SCMTransactionInfo object) {
+ return object;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 286bdc9..3c88174 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -124,6 +124,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
.newBuilder().setPipelineStore(pipelineStore)
.setRatisServer(scmhaManager.getRatisServer())
.setNodeManager(nodeManager)
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
.build();
// Create PipelineFactory
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
index 06bae4c..7f88a42 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -52,17 +53,19 @@ public class PipelineStateManagerV2Impl implements StateManager {
private final PipelineStateMap pipelineStateMap;
private final NodeManager nodeManager;
private Table<PipelineID, Pipeline> pipelineStore;
+ private final DBTransactionBuffer transactionBuffer;
// Protect potential contentions between RaftServer and PipelineManager.
// See https://issues.apache.org/jira/browse/HDDS-4560
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public PipelineStateManagerV2Impl(
- Table<PipelineID, Pipeline> pipelineStore, NodeManager nodeManager)
- throws IOException {
+ Table<PipelineID, Pipeline> pipelineStore, NodeManager nodeManager,
+ DBTransactionBuffer buffer) throws IOException {
this.pipelineStateMap = new PipelineStateMap();
this.nodeManager = nodeManager;
this.pipelineStore = pipelineStore;
+ this.transactionBuffer = buffer;
initialize();
}
@@ -90,7 +93,8 @@ public class PipelineStateManagerV2Impl implements StateManager {
try {
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
if (pipelineStore != null) {
- pipelineStore.put(pipeline.getId(), pipeline);
+ pipelineStore.putWithBatch(transactionBuffer.getCurrentBatchOperation(),
+ pipeline.getId(), pipeline);
pipelineStateMap.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
LOG.info("Created pipeline {}.", pipeline);
@@ -219,7 +223,8 @@ public class PipelineStateManagerV2Impl implements StateManager {
try {
PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
if (pipelineStore != null) {
- pipelineStore.delete(pipelineID);
+ pipelineStore.deleteWithBatch(
+ transactionBuffer.getCurrentBatchOperation(), pipelineID);
}
Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
nodeManager.removePipeline(pipeline);
@@ -256,7 +261,8 @@ public class PipelineStateManagerV2Impl implements StateManager {
if (pipelineStore != null) {
pipelineStateMap.updatePipelineState(pipelineID,
Pipeline.PipelineState.fromProtobuf(newState));
- pipelineStore.put(pipelineID, getPipeline(pipelineID));
+ pipelineStore.putWithBatch(transactionBuffer.getCurrentBatchOperation(),
+ pipelineID, getPipeline(pipelineID));
}
} catch (IOException ex) {
LOG.warn("Pipeline {} state update failed", pipelineID);
@@ -333,6 +339,12 @@ public class PipelineStateManagerV2Impl implements StateManager {
private Table<PipelineID, Pipeline> pipelineStore;
private NodeManager nodeManager;
private SCMRatisServer scmRatisServer;
+ private DBTransactionBuffer transactionBuffer;
+
+ public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) {
+ this.transactionBuffer = buffer;
+ return this;
+ }
public Builder setRatisServer(final SCMRatisServer ratisServer) {
scmRatisServer = ratisServer;
@@ -354,7 +366,8 @@ public class PipelineStateManagerV2Impl implements StateManager {
Preconditions.checkNotNull(pipelineStore);
final StateManager pipelineStateManager =
- new PipelineStateManagerV2Impl(pipelineStore, nodeManager);
+ new PipelineStateManagerV2Impl(
+ pipelineStore, nodeManager, transactionBuffer);
final SCMHAInvocationHandler invocationHandler =
new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.PIPELINE,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index b0145bb..6782f52 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -271,7 +271,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
// Creates the SCM DBs or opens them if it exists.
// A valid pointer to the store is required by all the other services below.
initalizeMetadataStore(conf, configurator);
-
// Authenticate SCM if security is enabled, this initialization can only
// be done after the metadata store is initialized.
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
@@ -868,7 +867,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
* Stop service.
*/
public void stop() {
-
try {
LOG.info("Stopping Replication Manager Service.");
replicationManager.stop();
@@ -962,6 +960,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
IOUtils.cleanupWithLogger(LOG, pipelineManager);
try {
+ scmHAManager.shutdown();
+ } catch (Exception ex) {
+ LOG.error("SCM HA Manager stop failed", ex);
+ }
+
+ try {
scmMetadataStore.stop();
} catch (Exception ex) {
LOG.error("SCM Metadata store stop failed", ex);
@@ -971,12 +975,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
ms.stop();
}
- try {
- scmHAManager.shutdown();
- } catch (Exception ex) {
- LOG.error("SCM HA Manager stop failed", ex);
- }
-
scmSafeModeManager.stop();
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestSCMTransactionInfoCodec.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestSCMTransactionInfoCodec.java
new file mode 100644
index 0000000..f0939ab
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestSCMTransactionInfoCodec.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.fail;
+
+public class TestSCMTransactionInfoCodec {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private SCMTransactionInfoCodec codec;
+
+ @Before
+ public void setUp() {
+ codec = new SCMTransactionInfoCodec();
+ }
+
+ @Test
+ public void toAndFromPersistedFormat() throws IOException {
+ SCMTransactionInfo scmTransactionInfo =
+ new SCMTransactionInfo.Builder().setTransactionIndex(100)
+ .setCurrentTerm(11).build();
+
+ SCMTransactionInfo convertedTransactionInfo =
+ codec.fromPersistedFormat(codec.toPersistedFormat(scmTransactionInfo));
+
+ Assert.assertEquals(scmTransactionInfo, convertedTransactionInfo);
+ }
+
+ @Test
+ public void testCodecWithNullDataFromTable() throws Exception {
+ thrown.expect(NullPointerException.class);
+ codec.fromPersistedFormat(null);
+ }
+
+ @Test
+ public void testCodecWithNullDataFromUser() throws Exception {
+ thrown.expect(NullPointerException.class);
+ codec.toPersistedFormat(null);
+ }
+
+ @Test
+ public void testCodecWithIncorrectValues() throws Exception {
+ try {
+ codec.fromPersistedFormat("random".getBytes(StandardCharsets.UTF_8));
+ fail("testCodecWithIncorrectValues failed");
+ } catch (IllegalStateException ex) {
+ GenericTestUtils.assertExceptionContains("Incorrect TransactionInfo " +
+ "value", ex);
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index c2dc995..654fd9c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
@@ -104,9 +106,21 @@ public class TestPipelineManagerImpl {
SCMContext.emptyContext());
}
+ private PipelineManagerV2Impl createPipelineManager(
+ boolean isLeader, DBTransactionBuffer buffer) throws IOException {
+ return PipelineManagerV2Impl.newPipelineManager(conf,
+ MockSCMHAManager.getInstance(isLeader, buffer),
+ new MockNodeManager(true, 20),
+ SCMDBDefinition.PIPELINES.getTable(dbStore),
+ new EventQueue(),
+ SCMContext.emptyContext());
+ }
+
@Test
public void testCreatePipeline() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ DBTransactionBuffer buffer1 = new MockDBTransactionBuffer(dbStore);
+ PipelineManagerV2Impl pipelineManager =
+ createPipelineManager(true, buffer1);
Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
pipelineManager.allowPipelineCreation();
Pipeline pipeline1 = pipelineManager.createPipeline(
@@ -118,15 +132,19 @@ public class TestPipelineManagerImpl {
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE);
Assert.assertEquals(2, pipelineManager.getPipelines().size());
Assert.assertTrue(pipelineManager.containsPipeline(pipeline2.getId()));
+ buffer1.close();
pipelineManager.close();
- PipelineManagerV2Impl pipelineManager2 = createPipelineManager(true);
+ DBTransactionBuffer buffer2 = new MockDBTransactionBuffer(dbStore);
+ PipelineManagerV2Impl pipelineManager2 =
+ createPipelineManager(true, buffer2);
// Should be able to load previous pipelines.
Assert.assertFalse(pipelineManager2.getPipelines().isEmpty());
Assert.assertEquals(2, pipelineManager.getPipelines().size());
pipelineManager2.allowPipelineCreation();
Pipeline pipeline3 = pipelineManager2.createPipeline(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+ buffer2.close();
Assert.assertEquals(3, pipelineManager2.getPipelines().size());
Assert.assertTrue(pipelineManager2.containsPipeline(pipeline3.getId()));
@@ -151,7 +169,9 @@ public class TestPipelineManagerImpl {
@Test
public void testUpdatePipelineStates() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ DBTransactionBuffer buffer = new MockDBTransactionBuffer(dbStore);
+ PipelineManagerV2Impl pipelineManager =
+ createPipelineManager(true, buffer);
Table<PipelineID, Pipeline> pipelineStore =
SCMDBDefinition.PIPELINES.getTable(dbStore);
pipelineManager.allowPipelineCreation();
@@ -160,6 +180,7 @@ public class TestPipelineManagerImpl {
Assert.assertEquals(1, pipelineManager.getPipelines().size());
Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+ buffer.flush();
Assert.assertEquals(ALLOCATED,
pipelineStore.get(pipeline.getId()).getPipelineState());
PipelineID pipelineID = pipeline.getId();
@@ -170,11 +191,13 @@ public class TestPipelineManagerImpl {
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.OPEN).contains(pipeline));
+ buffer.flush();
Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
pipelineManager.deactivatePipeline(pipeline.getId());
Assert.assertEquals(Pipeline.PipelineState.DORMANT,
pipelineManager.getPipeline(pipelineID).getPipelineState());
+ buffer.flush();
Assert.assertEquals(Pipeline.PipelineState.DORMANT,
pipelineStore.get(pipeline.getId()).getPipelineState());
Assert.assertFalse(pipelineManager
@@ -187,6 +210,7 @@ public class TestPipelineManagerImpl {
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.OPEN).contains(pipeline));
+ buffer.flush();
Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
pipelineManager.close();
}
@@ -430,7 +454,9 @@ public class TestPipelineManagerImpl {
@Test
public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ DBTransactionBuffer buffer1 = new MockDBTransactionBuffer(dbStore);
+ PipelineManagerV2Impl pipelineManager =
+ createPipelineManager(true, buffer1);
pipelineManager.allowPipelineCreation();
pipelineManager.onMessage(
@@ -439,6 +465,7 @@ public class TestPipelineManagerImpl {
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
// close manager
+ buffer1.close();
pipelineManager.close();
// new pipeline manager loads the pipelines from the db in ALLOCATED state
pipelineManager = createPipelineManager(true);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
index a04ecea..ef579d1 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
import org.apache.hadoop.hdds.scm.metadata.PipelineCodec;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
@@ -117,6 +118,11 @@ public class TestSCMStoreImplWithOldPipelineIDKeyFormat
}
@Override
+ public Table<String, SCMTransactionInfo> getTransactionInfoTable() {
+ return null;
+ }
+
+ @Override
public BatchOperationHandler getBatchHandler() {
return null;
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
new file mode 100644
index 0000000..e1d43a5
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.ha.SCMHAConfiguration;
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+
+public class TestSCMSnapshot {
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ conf = new OzoneConfiguration();
+ SCMHAConfiguration scmhaConfiguration = conf.getObject(
+ SCMHAConfiguration.class);
+ scmhaConfiguration.setRatisSnapshotThreshold(1L);
+ conf.setFromObject(scmhaConfiguration);
+ cluster = MiniOzoneCluster
+ .newBuilder(conf)
+ .setNumDatanodes(3)
+ .build();
+ cluster.waitForClusterToBeReady();
+ }
+
+ @Test
+ public void testSnapshot() throws Exception {
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ long snapshotInfo1 = scm.getScmHAManager().getDBTransactionBuffer()
+ .getLatestTrxInfo().getTransactionIndex();
+ ContainerManagerV2 containerManager = scm.getContainerManager();
+ PipelineManager pipelineManager = scm.getPipelineManager();
+ Pipeline ratisPipeline1 = pipelineManager.getPipeline(
+ containerManager.allocateContainer(
+ RATIS, THREE, "Owner1").getPipelineID());
+ pipelineManager.openPipeline(ratisPipeline1.getId());
+ Pipeline ratisPipeline2 = pipelineManager.getPipeline(
+ containerManager.allocateContainer(
+ RATIS, ONE, "Owner2").getPipelineID());
+ pipelineManager.openPipeline(ratisPipeline2.getId());
+ long snapshotInfo2 = scm.getScmHAManager().getDBTransactionBuffer()
+ .getLatestTrxInfo().getTransactionIndex();
+
+ Assert.assertTrue(
+ String.format("Snapshot index 2 {} should greater than Snapshot " +
+ "index 1 {}", snapshotInfo2, snapshotInfo1),
+ snapshotInfo2 > snapshotInfo1);
+
+ Table<String, SCMTransactionInfo> trxInfo =
+ scm.getScmMetadataStore().getTransactionInfoTable();
+ SCMTransactionInfo scmTransactionInfo = trxInfo.get(TRANSACTION_INFO_KEY);
+
+ Assert.assertTrue(
+ "DB trx info:" + scmTransactionInfo.getTransactionIndex()
+ + ", latestSnapshotInfo:" + snapshotInfo2,
+ scmTransactionInfo.getTransactionIndex() >= snapshotInfo2);
+
+ cluster.restartStorageContainerManager(false);
+ SCMTransactionInfo trxInfoAfterRestart =
+ scm.getScmHAManager().getDBTransactionBuffer().getLatestTrxInfo();
+ Assert.assertTrue(
+ trxInfoAfterRestart.getTransactionIndex() >= snapshotInfo2);
+ try {
+ pipelineManager.getPipeline(ratisPipeline1.getId());
+ pipelineManager.getPipeline(ratisPipeline2.getId());
+ } catch (PipelineNotFoundException e) {
+ Assert.fail("Should not see a PipelineNotFoundException");
+ }
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 58e4597..d0cd642 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -300,6 +300,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
public void restartStorageContainerManager(boolean waitForDatanode)
throws TimeoutException, InterruptedException, IOException,
AuthenticationException {
+ LOG.info("Restarting SCM in cluster " + this.getClass());
scm.stop();
scm.join();
scm = TestUtils.getScmSimple(conf);
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index 4b71f44..73e260f 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@ -32,9 +32,11 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
@@ -66,12 +68,13 @@ public class ReconContainerManager extends ContainerManagerImpl {
*/
public ReconContainerManager(
Configuration conf,
+ DBStore store,
Table<ContainerID, ContainerInfo> containerStore,
PipelineManager pipelineManager,
StorageContainerServiceProvider scm,
ContainerSchemaManager containerSchemaManager) throws IOException {
- super(conf, MockSCMHAManager.getInstance(true),
- pipelineManager, containerStore);
+ super(conf, MockSCMHAManager.getInstance(true,
+ new MockDBTransactionBuffer(store)), pipelineManager, containerStore);
this.scmClient = scm;
this.pipelineManager = pipelineManager;
this.containerSchemaManager = containerSchemaManager;
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index a406779..80648cf 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -123,6 +123,7 @@ public class ReconStorageContainerManagerFacade
ReconSCMDBDefinition.PIPELINES.getTable(dbStore),
eventQueue);
this.containerManager = new ReconContainerManager(conf,
+ dbStore,
ReconSCMDBDefinition.CONTAINERS.getTable(dbStore),
pipelineManager,
scmServiceProvider,
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
index f3592bc..93cf256 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
@@ -83,6 +83,7 @@ public class AbstractReconContainerManagerTest {
ReconSCMDBDefinition.PIPELINES.getTable(store), eventQueue);
containerManager = new ReconContainerManager(
conf,
+ store,
ReconSCMDBDefinition.CONTAINERS.getTable(store),
pipelineManager,
getScmServiceProvider(),
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
index c493e2c..fec673a 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.junit.Test;
/**
@@ -72,6 +73,8 @@ public class TestReconContainerManager
assertEquals(containerID, containersInPipeline.first());
// Verify container DB.
+ SCMHAManager scmhaManager = containerManager.getSCMHAManager();
+ scmhaManager.getDBTransactionBuffer().close();
assertTrue(getContainerTable().isExist(containerID));
}
@@ -95,6 +98,8 @@ public class TestReconContainerManager
assertEquals(1, containers.size());
assertEquals(containerInfo, containers.get(0));
// Verify container DB.
+ SCMHAManager scmhaManager = containerManager.getSCMHAManager();
+ scmhaManager.getDBTransactionBuffer().close();
assertTrue(getContainerTable().isExist(containerID));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org