You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2018/08/13 07:18:29 UTC
hadoop git commit: HDDS-308. SCM should identify a container with
pending deletes using container reports. Contributed by Lokesh Jain.
Repository: hadoop
Updated Branches:
refs/heads/trunk 3ac07b720 -> a8dae0047
HDDS-308. SCM should identify a container with pending deletes using container reports. Contributed by Lokesh Jain.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a8dae004
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a8dae004
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a8dae004
Branch: refs/heads/trunk
Commit: a8dae0047cc3d86468ffe275be0d7857784fa8ab
Parents: 3ac07b7
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Mon Aug 13 12:47:52 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Mon Aug 13 12:47:52 2018 +0530
----------------------------------------------------------------------
.../container/common/impl/ContainerSet.java | 3 +-
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 4 +-
.../hadoop/hdds/scm/block/DeletedBlockLog.java | 6 +-
.../hdds/scm/block/DeletedBlockLogImpl.java | 38 ++++++----
.../hdds/scm/block/PendingDeleteHandler.java | 38 ++++++++++
.../hdds/scm/block/PendingDeleteStatusList.java | 79 ++++++++++++++++++++
.../hdds/scm/block/SCMBlockDeletingService.java | 20 ++++-
.../hdds/scm/container/ContainerMapping.java | 35 +++++++--
.../scm/container/ContainerStateManager.java | 14 ++--
.../hadoop/hdds/scm/events/SCMEvents.java | 9 +++
.../scm/server/StorageContainerManager.java | 5 ++
.../hadoop/hdds/scm/block/TestBlockManager.java | 25 -------
.../commandhandler/TestBlockDeletion.java | 55 +++++++++++++-
13 files changed, 268 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 7a6cb2d..3da09f2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -215,7 +215,8 @@ public class ContainerSet {
.setReadBytes(containerData.getReadBytes())
.setWriteBytes(containerData.getWriteBytes())
.setUsed(containerData.getBytesUsed())
- .setState(getState(containerData));
+ .setState(getState(containerData))
+ .setDeleteTransactionId(containerData.getDeleteTransactionId());
crBuilder.addReports(ciBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 8e1c2cc..f3a111f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -372,9 +372,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
}
try {
- Map<Long, Long> deleteTransactionsMap =
- deletedBlockLog.addTransactions(containerBlocks);
- containerManager.updateDeleteTransactionId(deleteTransactionsMap);
+ deletedBlockLog.addTransactions(containerBlocks);
} catch (IOException e) {
throw new IOException(
"Skip writing the deleted blocks info to"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index 2bb5686..db6c1c5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -42,9 +42,10 @@ public interface DeletedBlockLog extends Closeable {
* Once DatanodeDeletedBlockTransactions is full, the scan behavior will
* stop.
* @param transactions a list of TXs will be set into.
+ * @return Mapping from containerId to latest transactionId for the container.
* @throws IOException
*/
- void getTransactions(DatanodeDeletedBlockTransactions transactions)
+ Map<Long, Long> getTransactions(DatanodeDeletedBlockTransactions transactions)
throws IOException;
/**
@@ -101,10 +102,9 @@ public interface DeletedBlockLog extends Closeable {
* number of containers) together (on success) or non (on failure).
*
* @param containerBlocksMap a map of containerBlocks.
- * @return Mapping from containerId to latest transactionId for the container.
* @throws IOException
*/
- Map<Long, Long> addTransactions(Map<Long, List<Long>> containerBlocksMap)
+ void addTransactions(Map<Long, List<Long>> containerBlocksMap)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index ca4e1d0..df97c27 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@@ -45,13 +46,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import static java.lang.Math.min;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
@@ -239,21 +241,26 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
// set of dns which have successfully committed transaction txId.
dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
Long containerId = transactionResult.getContainerID();
- if (dnsWithCommittedTxn == null || containerId == null) {
- LOG.warn("Transaction txId={} commit by dnId={} failed."
- + " Corresponding entry not found.", txID, dnID);
+ if (dnsWithCommittedTxn == null) {
+ LOG.warn("Transaction txId={} commit by dnId={} for containerID={} "
+ + "failed. Corresponding entry not found.", txID, dnID,
+ containerId);
return;
}
dnsWithCommittedTxn.add(dnID);
- Collection<DatanodeDetails> containerDnsDetails =
+ Pipeline pipeline =
containerManager.getContainerWithPipeline(containerId)
- .getPipeline().getDatanodes().values();
+ .getPipeline();
+ Collection<DatanodeDetails> containerDnsDetails =
+ pipeline.getDatanodes().values();
// The delete entry can be safely removed from the log if all the
- // corresponding nodes commit the txn.
- if (dnsWithCommittedTxn.size() >= containerDnsDetails.size()) {
+ // corresponding nodes commit the txn. It is required to check that
+ // the nodes returned in the pipeline match the replication factor.
+ if (min(containerDnsDetails.size(), dnsWithCommittedTxn.size())
+ >= pipeline.getFactor().getNumber()) {
List<UUID> containerDns = containerDnsDetails.stream()
- .map(dnDetails -> dnDetails.getUuid())
+ .map(DatanodeDetails::getUuid)
.collect(Collectors.toList());
if (dnsWithCommittedTxn.containsAll(containerDns)) {
transactionToDNsCommitMap.remove(txID);
@@ -338,15 +345,13 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
* {@inheritDoc}
*
* @param containerBlocksMap a map of containerBlocks.
- * @return Mapping from containerId to latest transactionId for the container.
* @throws IOException
*/
@Override
- public Map<Long, Long> addTransactions(
+ public void addTransactions(
Map<Long, List<Long>> containerBlocksMap)
throws IOException {
BatchOperation batch = new BatchOperation();
- Map<Long, Long> deleteTransactionsMap = new HashMap<>();
lock.lock();
try {
long currentLatestID = lastTxID;
@@ -356,13 +361,11 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
byte[] key = Longs.toByteArray(currentLatestID);
DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID,
entry.getKey(), entry.getValue());
- deleteTransactionsMap.put(entry.getKey(), currentLatestID);
batch.put(key, tx.toByteArray());
}
lastTxID = currentLatestID;
batch.put(LATEST_TXID, Longs.toByteArray(lastTxID));
deletedStore.writeBatch(batch);
- return deleteTransactionsMap;
} finally {
lock.unlock();
}
@@ -376,10 +379,11 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
}
@Override
- public void getTransactions(DatanodeDeletedBlockTransactions transactions)
- throws IOException {
+ public Map<Long, Long> getTransactions(
+ DatanodeDeletedBlockTransactions transactions) throws IOException {
lock.lock();
try {
+ Map<Long, Long> deleteTransactionMap = new HashMap<>();
deletedStore.iterate(null, (key, value) -> {
if (!Arrays.equals(LATEST_TXID, key)) {
DeletedBlocksTransaction block = DeletedBlocksTransaction
@@ -388,6 +392,7 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
if (block.getCount() > -1 && block.getCount() <= maxRetry) {
if (transactions.addTransaction(block,
transactionToDNsCommitMap.get(block.getTxID()))) {
+ deleteTransactionMap.put(block.getContainerID(), block.getTxID());
transactionToDNsCommitMap
.putIfAbsent(block.getTxID(), new ConcurrentHashSet<>());
}
@@ -396,6 +401,7 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
}
return true;
});
+ return deleteTransactionMap;
} finally {
lock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java
new file mode 100644
index 0000000..736daac
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
+public class PendingDeleteHandler implements
+ EventHandler<PendingDeleteStatusList> {
+
+ private SCMBlockDeletingService scmBlockDeletingService;
+
+ public PendingDeleteHandler(
+ SCMBlockDeletingService scmBlockDeletingService) {
+ this.scmBlockDeletingService = scmBlockDeletingService;
+ }
+
+ @Override
+ public void onMessage(PendingDeleteStatusList pendingDeleteStatusList,
+ EventPublisher publisher) {
+ scmBlockDeletingService.handlePendingDeletes(pendingDeleteStatusList);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java
new file mode 100644
index 0000000..904762d
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class PendingDeleteStatusList {
+
+ private List<PendingDeleteStatus> pendingDeleteStatuses;
+ private DatanodeDetails datanodeDetails;
+
+ public PendingDeleteStatusList(DatanodeDetails datanodeDetails) {
+ this.datanodeDetails = datanodeDetails;
+ pendingDeleteStatuses = new LinkedList<>();
+ }
+
+ public void addPendingDeleteStatus(long dnDeleteTransactionId,
+ long scmDeleteTransactionId, long containerId) {
+ pendingDeleteStatuses.add(
+ new PendingDeleteStatus(dnDeleteTransactionId, scmDeleteTransactionId,
+ containerId));
+ }
+
+ public static class PendingDeleteStatus {
+ private long dnDeleteTransactionId;
+ private long scmDeleteTransactionId;
+ private long containerId;
+
+ public PendingDeleteStatus(long dnDeleteTransactionId,
+ long scmDeleteTransactionId, long containerId) {
+ this.dnDeleteTransactionId = dnDeleteTransactionId;
+ this.scmDeleteTransactionId = scmDeleteTransactionId;
+ this.containerId = containerId;
+ }
+
+ public long getDnDeleteTransactionId() {
+ return dnDeleteTransactionId;
+ }
+
+ public long getScmDeleteTransactionId() {
+ return scmDeleteTransactionId;
+ }
+
+ public long getContainerId() {
+ return containerId;
+ }
+
+ }
+
+ public List<PendingDeleteStatus> getPendingDeleteStatuses() {
+ return pendingDeleteStatuses;
+ }
+
+ public int getNumPendingDeletes() {
+ return pendingDeleteStatuses.size();
+ }
+
+ public DatanodeDetails getDatanodeDetails() {
+ return datanodeDetails;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 6f65fdd..699fd37 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -56,7 +57,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
*/
public class SCMBlockDeletingService extends BackgroundService {
- static final Logger LOG =
+ public static final Logger LOG =
LoggerFactory.getLogger(SCMBlockDeletingService.class);
// ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
@@ -106,6 +107,19 @@ public class SCMBlockDeletingService extends BackgroundService {
return queue;
}
+ public void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) {
+ DatanodeDetails dnDetails = deletionStatusList.getDatanodeDetails();
+ for (PendingDeleteStatusList.PendingDeleteStatus deletionStatus : deletionStatusList
+ .getPendingDeleteStatuses()) {
+ LOG.info(
+ "Block deletion txnID mismatch in datanode {} for containerID {}."
+ + " Datanode delete txnID: {}, SCM txnID: {}",
+ dnDetails.getUuid(), deletionStatus.getContainerId(),
+ deletionStatus.getDnDeleteTransactionId(),
+ deletionStatus.getScmDeleteTransactionId());
+ }
+ }
+
private class DeletedBlockTransactionScanner
implements BackgroundTask<EmptyTaskResult> {
@@ -123,11 +137,12 @@ public class SCMBlockDeletingService extends BackgroundService {
LOG.debug("Running DeletedBlockTransactionScanner");
DatanodeDeletedBlockTransactions transactions = null;
List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
+ Map<Long, Long> transactionMap = null;
if (datanodes != null) {
transactions = new DatanodeDeletedBlockTransactions(mappingService,
blockDeleteLimitSize, datanodes.size());
try {
- deletedBlockLog.getTransactions(transactions);
+ transactionMap = deletedBlockLog.getTransactions(transactions);
} catch (IOException e) {
// We may tolerant a number of failures for sometime
// but if it continues to fail, at some point we need to raise
@@ -159,6 +174,7 @@ public class SCMBlockDeletingService extends BackgroundService {
transactions.getTransactionIDList(dnId)));
}
}
+ mappingService.updateDeleteTransactionId(transactionMap);
}
if (dnTxCount > 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index d84551a..863d6c5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -23,11 +23,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
+import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
@@ -43,6 +45,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseException;
import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
@@ -86,6 +89,7 @@ public class ContainerMapping implements Mapping {
private final LeaseManager<ContainerInfo> containerLeaseManager;
private final float containerCloseThreshold;
private final ContainerCloser closer;
+ private final EventPublisher eventPublisher;
private final long size;
/**
@@ -128,6 +132,7 @@ public class ContainerMapping implements Mapping {
OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024;
this.containerStateManager =
new ContainerStateManager(conf, this);
+ LOG.trace("Container State Manager created.");
this.pipelineSelector = new PipelineSelector(nodeManager,
containerStateManager, conf, eventPublisher);
@@ -135,7 +140,7 @@ public class ContainerMapping implements Mapping {
this.containerCloseThreshold = conf.getFloat(
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
- LOG.trace("Container State Manager created.");
+ this.eventPublisher = eventPublisher;
long containerCreationLeaseTimeout = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
@@ -398,8 +403,13 @@ public class ContainerMapping implements Mapping {
*/
public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
throws IOException {
+ if (deleteTransactionMap == null) {
+ return;
+ }
+
lock.lock();
try {
+ BatchOperation batch = new BatchOperation();
for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
long containerID = entry.getKey();
byte[] dbKey = Longs.toByteArray(containerID);
@@ -413,10 +423,11 @@ public class ContainerMapping implements Mapping {
ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
HddsProtos.SCMContainerInfo.parseFrom(containerBytes));
containerInfo.updateDeleteTransactionId(entry.getValue());
- containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray());
- containerStateManager
- .updateDeleteTransactionId(containerID, entry.getValue());
+ batch.put(dbKey, containerInfo.getProtobuf().toByteArray());
}
+ containerStore.writeBatch(batch);
+ containerStateManager
+ .updateDeleteTransactionId(deleteTransactionMap);
} finally {
lock.unlock();
}
@@ -484,7 +495,8 @@ public class ContainerMapping implements Mapping {
throws IOException {
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
containerInfos = reports.getReportsList();
-
+ PendingDeleteStatusList pendingDeleteStatusList =
+ new PendingDeleteStatusList(datanodeDetails);
for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
containerInfos) {
byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID());
@@ -498,6 +510,14 @@ public class ContainerMapping implements Mapping {
HddsProtos.SCMContainerInfo newState =
reconcileState(datanodeState, knownState, datanodeDetails);
+ if (knownState.getDeleteTransactionId() > datanodeState
+ .getDeleteTransactionId()) {
+ pendingDeleteStatusList
+ .addPendingDeleteStatus(datanodeState.getDeleteTransactionId(),
+ knownState.getDeleteTransactionId(),
+ knownState.getContainerID());
+ }
+
// FIX ME: This can be optimized, we write twice to memory, where a
// single write would work well.
//
@@ -529,6 +549,11 @@ public class ContainerMapping implements Mapping {
lock.unlock();
}
}
+ if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
+ eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
+ pendingDeleteStatusList);
+ }
+
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index f0ab213..6b983a6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -47,6 +47,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -360,13 +361,14 @@ public class ContainerStateManager implements Closeable {
/**
* Update deleteTransactionId for a container.
*
- * @param containerID ContainerID of the container whose delete
- * transactionId needs to be updated.
- * @param transactionId latest transactionId to be updated for the container
+ * @param deleteTransactionMap maps containerId to its new
+ * deleteTransactionID
*/
- public void updateDeleteTransactionId(Long containerID, long transactionId) {
- containers.getContainerMap().get(ContainerID.valueof(containerID))
- .updateDeleteTransactionId(transactionId);
+ public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap) {
+ for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
+ containers.getContainerMap().get(ContainerID.valueof(entry.getKey()))
+ .updateDeleteTransactionId(entry.getValue());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 70b1e96..5911ce2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hdds.scm.events;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.CloseContainerStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
@@ -147,6 +148,14 @@ public final class SCMEvents {
"DeleteBlockCommandStatus");
/**
+ * This event will be triggered while processing container reports from DN
+ * when deleteTransactionID of container in report mismatches with the
+ * deleteTransactionID on SCM.
+ */
+ public static final Event<PendingDeleteStatusList> PENDING_DELETE_STATUS =
+ new TypedEvent(PendingDeleteStatusList.class, "PendingDeleteStatus");
+
+ /**
* This is the command for ReplicationManager to handle under/over
* replication. Sent by the ContainerReportHandler after processing the
* heartbeat.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 47a9100..178e2bd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
+import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
@@ -219,6 +220,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
+ PendingDeleteHandler pendingDeleteHandler =
+ new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
ContainerReportHandler containerReportHandler =
new ContainerReportHandler(scmContainerManager, node2ContainerMap,
@@ -235,6 +238,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus);
+ eventQueue
+ .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
long watcherTimeout =
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index 7049029..2beb4e7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -107,31 +107,6 @@ public class TestBlockManager {
}
@Test
- public void testDeleteBlock() throws Exception {
- AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
- type, factor, containerOwner);
- Assert.assertNotNull(block);
- long transactionId =
- mapping.getContainer(block.getBlockID().getContainerID())
- .getDeleteTransactionId();
- Assert.assertEquals(0, transactionId);
- blockManager.deleteBlocks(Collections.singletonList(
- block.getBlockID()));
- Assert.assertEquals(++transactionId,
- mapping.getContainer(block.getBlockID().getContainerID())
- .getDeleteTransactionId());
-
- block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
- type, factor, containerOwner);
- Assert.assertNotNull(block);
- blockManager.deleteBlocks(Collections.singletonList(
- block.getBlockID()));
- Assert.assertEquals(++transactionId,
- mapping.getContainer(block.getBlockID().getContainerID())
- .getDeleteTransactionId());
- }
-
- @Test
public void testAllocateOversizedBlock() throws IOException {
long size = 6 * GB;
thrown.expectMessage("Unsupported block size");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8dae004/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index ee9aed2..badd435 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -23,6 +23,11 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -43,6 +48,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.ozShell.TestOzoneShell;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.utils.MetadataStore;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -101,7 +107,7 @@ public class TestBlockDeletion {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
- String value = RandomStringUtils.random(1000000);
+ String value = RandomStringUtils.random(10000000);
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
@@ -111,7 +117,9 @@ public class TestBlockDeletion {
OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
- out.write(value.getBytes());
+ for (int i = 0; i < 100; i++) {
+ out.write(value.getBytes());
+ }
out.close();
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
@@ -144,6 +152,49 @@ public class TestBlockDeletion {
Assert.assertTrue(!containerIdsWithDeletedBlocks.isEmpty());
// Containers in the DN and SCM should have same delete transactionIds
matchContainerTransactionIds();
+ // Containers in the DN and SCM should have same delete transactionIds
+ // after DN restart. The assertion is just to verify that the state of
+ // containerInfos in dn and scm is consistent after dn restart.
+ cluster.restartHddsDatanode(0);
+ matchContainerTransactionIds();
+
+ // verify PENDING_DELETE_STATUS event is fired
+ verifyBlockDeletionEvent();
+ }
+
+ private void verifyBlockDeletionEvent()
+ throws IOException, InterruptedException {
+ LogCapturer logCapturer =
+ LogCapturer.captureLogs(SCMBlockDeletingService.LOG);
+ // Create dummy container reports with deleteTransactionId set as 0
+ ContainerReportsProto containerReport = dnContainerSet.getContainerReport();
+ ContainerReportsProto.Builder dummyReportsBuilder =
+ ContainerReportsProto.newBuilder();
+ for (ContainerInfo containerInfo : containerReport.getReportsList()) {
+ dummyReportsBuilder.addReports(
+ ContainerInfo.newBuilder(containerInfo).setDeleteTransactionId(0)
+ .build());
+ }
+ ContainerReportsProto dummyReport = dummyReportsBuilder.build();
+
+ logCapturer.clearOutput();
+ scm.getScmContainerManager().processContainerReports(
+ cluster.getHddsDatanodes().get(0).getDatanodeDetails(), dummyReport);
+ // wait for event to be handled by event handler
+ Thread.sleep(1000);
+ String output = logCapturer.getOutput();
+ for (ContainerInfo containerInfo : dummyReport.getReportsList()) {
+ long containerId = containerInfo.getContainerID();
+ // Event should be triggered only for containers which have deleted blocks
+ if (containerIdsWithDeletedBlocks.contains(containerId)) {
+ Assert.assertTrue(output.contains(
+ "for containerID " + containerId + ". Datanode delete txnID"));
+ } else {
+ Assert.assertTrue(!output.contains(
+ "for containerID " + containerId + ". Datanode delete txnID"));
+ }
+ }
+ logCapturer.clearOutput();
}
private void matchContainerTransactionIds() throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org