You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/10 20:27:02 UTC
[50/50] [abbrv] hadoop git commit: HDFS-13076: [SPS]: Cleanup work
for HDFS-10285 merge. Contributed by Rakesh R.
HDFS-13076: [SPS]: Cleanup work for HDFS-10285 merge. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0f142918
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0f142918
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0f142918
Branch: refs/heads/HDFS-10285
Commit: 0f142918a4e51a573ce9b630f97e1722eac833ce
Parents: 66fbe14
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Mon Jul 23 16:05:35 2018 -0700
Committer: Uma Maheswara Rao Gangumalla <um...@apache.org>
Committed: Fri Aug 10 13:23:06 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hdfs/DFSClient.java | 26 -
.../hadoop/hdfs/protocol/ClientProtocol.java | 29 -
.../hadoop/hdfs/protocol/HdfsConstants.java | 40 -
.../ClientNamenodeProtocolTranslatorPB.java | 36 -
.../hadoop/hdfs/protocolPB/PBHelperClient.java | 37 -
.../src/main/proto/ClientNamenodeProtocol.proto | 26 -
.../federation/router/RouterRpcServer.java | 14 -
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 -
...tNamenodeProtocolServerSideTranslatorPB.java | 39 -
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 99 -
.../hadoop/hdfs/server/balancer/ExitStatus.java | 3 +-
.../server/blockmanagement/BlockManager.java | 21 +-
.../blockmanagement/DatanodeDescriptor.java | 68 -
.../server/blockmanagement/DatanodeManager.java | 94 +-
.../hdfs/server/datanode/BPOfferService.java | 12 -
.../hadoop/hdfs/server/datanode/DataNode.java | 12 -
.../datanode/StoragePolicySatisfyWorker.java | 217 ---
.../apache/hadoop/hdfs/server/mover/Mover.java | 21 -
.../hdfs/server/namenode/FSDirXAttrOp.java | 12 -
.../hdfs/server/namenode/FSNamesystem.java | 8 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 36 -
.../sps/BlockStorageMovementNeeded.java | 121 +-
.../hdfs/server/namenode/sps/Context.java | 5 -
.../IntraSPSNameNodeBlockMoveTaskHandler.java | 63 -
.../namenode/sps/IntraSPSNameNodeContext.java | 189 --
.../sps/IntraSPSNameNodeFileIdCollector.java | 185 --
.../hdfs/server/namenode/sps/SPSService.java | 5 -
.../namenode/sps/StoragePolicySatisfier.java | 44 -
.../sps/StoragePolicySatisfyManager.java | 156 +-
.../hdfs/server/sps/ExternalSPSContext.java | 5 -
.../sps/ExternalStoragePolicySatisfier.java | 9 -
.../hadoop/hdfs/tools/StoragePolicyAdmin.java | 87 +-
.../src/main/proto/DatanodeProtocol.proto | 30 -
.../src/main/resources/hdfs-default.xml | 14 +-
.../src/site/markdown/ArchivalStorage.md | 22 +-
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 40 +
.../server/datanode/TestBPOfferService.java | 4 -
.../TestStoragePolicySatisfyWorker.java | 241 ---
.../hadoop/hdfs/server/mover/TestMover.java | 7 +-
.../namenode/TestNameNodeReconfigure.java | 32 +-
.../TestPersistentStoragePolicySatisfier.java | 124 +-
.../TestStoragePolicySatisfierWithHA.java | 152 +-
.../TestBlockStorageMovementAttemptedItems.java | 3 +-
.../sps/TestStoragePolicySatisfier.java | 1825 ------------------
...stStoragePolicySatisfierWithStripedFile.java | 87 +-
.../sps/TestExternalStoragePolicySatisfier.java | 1433 +++++++++++++-
.../hdfs/tools/TestStoragePolicyCommands.java | 2 +-
.../TestStoragePolicySatisfyAdminCommands.java | 56 +-
48 files changed, 1517 insertions(+), 4278 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index b6f9bdd..adbb133 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -123,7 +123,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
@@ -3110,10 +3109,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
- public boolean isInternalSatisfierRunning() throws IOException {
- return namenode.isInternalSatisfierRunning();
- }
-
Tracer getTracer() {
return tracer;
}
@@ -3170,25 +3165,4 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
checkOpen();
return new OpenFilesIterator(namenode, tracer, openFilesTypes, path);
}
-
- /**
- * Check the storage policy satisfy status of the path for which
- * {@link DFSClient#satisfyStoragePolicy(String)} is called.
- *
- * @return Storage policy satisfy status.
- * <ul>
- * <li>PENDING if path is in queue and not processed for satisfying
- * the policy.</li>
- * <li>IN_PROGRESS if satisfying the storage policy for path.</li>
- * <li>SUCCESS if storage policy satisfied for the path.</li>
- * <li>NOT_AVAILABLE if
- * {@link DFSClient#satisfyStoragePolicy(String)} not called for
- * path or SPS work is already finished.</li>
- * </ul>
- * @throws IOException
- */
- public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
- String path) throws IOException {
- return namenode.checkStoragePolicySatisfyPathStatus(path);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 5c51c22..e8c881b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
@@ -1757,32 +1756,4 @@ public interface ClientProtocol {
*/
@AtMostOnce
void satisfyStoragePolicy(String path) throws IOException;
-
- /**
- * Check if internal StoragePolicySatisfier is running.
- * @return true if internal StoragePolicySatisfier is running
- * @throws IOException
- */
- @Idempotent
- boolean isInternalSatisfierRunning() throws IOException;
-
- /**
- * Check the storage policy satisfy status of the path for which
- * {@link ClientProtocol#satisfyStoragePolicy(String)} is called.
- *
- * @return Storage policy satisfy status.
- * <ul>
- * <li>PENDING if path is in queue and not processed for satisfying
- * the policy.</li>
- * <li>IN_PROGRESS if satisfying the storage policy for path.</li>
- * <li>SUCCESS if storage policy satisfied for the path.</li>
- * <li>NOT_AVAILABLE if
- * {@link ClientProtocol#satisfyStoragePolicy(String)} not called for
- * path or SPS work is already finished.</li>
- * </ul>
- * @throws IOException
- */
- @Idempotent
- StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
- String path) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index ab48dcd..6de186a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -134,12 +134,6 @@ public final class HdfsConstants {
public enum StoragePolicySatisfierMode {
/**
- * This mode represents that SPS service is running inside Namenode and can
- * accept any SPS call request.
- */
- INTERNAL,
-
- /**
* This mode represents that SPS service is running outside Namenode as an
* external service and can accept any SPS call request.
*/
@@ -166,40 +160,6 @@ public final class HdfsConstants {
}
}
-
- /**
- * Storage policy satisfy path status.
- */
- public enum StoragePolicySatisfyPathStatus {
- /**
- * Scheduled but not yet processed. This will come only in case of
- * directory. Directory will be added first in "pendingWorkForDirectory"
- * queue and then later it is processed recursively.
- */
- PENDING,
-
- /**
- * Satisfying the storage policy for path.
- */
- IN_PROGRESS,
-
- /**
- * Storage policy satisfied for the path.
- */
- SUCCESS,
-
- /**
- * Few blocks failed to move and the path is still not
- * fully satisfied the storage policy.
- */
- FAILURE,
-
- /**
- * Status not available.
- */
- NOT_AVAILABLE
- }
-
public enum RollingUpgradeAction {
QUERY, PREPARE, FINALIZE;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 683ccca..e4bca51 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -70,7 +70,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@@ -101,8 +100,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Append
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto;
@@ -150,8 +147,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSto
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
@@ -301,9 +296,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
private final static GetErasureCodingCodecsRequestProto
VOID_GET_EC_CODEC_REQUEST = GetErasureCodingCodecsRequestProto
.newBuilder().build();
- private final static IsInternalSatisfierRunningRequestProto
- VOID_IS_SPS_RUNNING_REQUEST = IsInternalSatisfierRunningRequestProto
- .newBuilder().build();
public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
@@ -1912,18 +1904,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
- public boolean isInternalSatisfierRunning() throws IOException {
- try {
- IsInternalSatisfierRunningResponseProto rep =
- rpcProxy.isInternalSatisfierRunning(null,
- VOID_IS_SPS_RUNNING_REQUEST);
- return rep.getRunning();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
- }
-
- @Override
public QuotaUsage getQuotaUsage(String path) throws IOException {
GetQuotaUsageRequestProto req =
GetQuotaUsageRequestProto.newBuilder().setPath(path).build();
@@ -1977,20 +1957,4 @@ public class ClientNamenodeProtocolTranslatorPB implements
throw ProtobufHelper.getRemoteException(e);
}
}
-
- @Override
- public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
- String path) throws IOException {
- try {
- CheckStoragePolicySatisfyPathStatusRequestProto request =
- CheckStoragePolicySatisfyPathStatusRequestProto.newBuilder()
- .setSrc(path)
- .build();
- CheckStoragePolicySatisfyPathStatusResponseProto response = rpcProxy
- .checkStoragePolicySatisfyPathStatus(null, request);
- return PBHelperClient.convert(response.getStatus());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 7770e31..4a5a493 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -130,7 +130,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheF
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolStatsProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusResponseProto.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto;
@@ -3399,40 +3398,4 @@ public class PBHelperClient {
}
return typeProtos;
}
-
- public static StoragePolicySatisfyPathStatus convert(
- HdfsConstants.StoragePolicySatisfyPathStatus status) {
- switch (status) {
- case PENDING:
- return StoragePolicySatisfyPathStatus.PENDING;
- case IN_PROGRESS:
- return StoragePolicySatisfyPathStatus.IN_PROGRESS;
- case SUCCESS:
- return StoragePolicySatisfyPathStatus.SUCCESS;
- case FAILURE:
- return StoragePolicySatisfyPathStatus.FAILURE;
- case NOT_AVAILABLE:
- return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
- default:
- throw new IllegalArgumentException("Unexpected SPSStatus :" + status);
- }
- }
-
- public static HdfsConstants.StoragePolicySatisfyPathStatus convert(
- StoragePolicySatisfyPathStatus status) {
- switch (status) {
- case PENDING:
- return HdfsConstants.StoragePolicySatisfyPathStatus.PENDING;
- case IN_PROGRESS:
- return HdfsConstants.StoragePolicySatisfyPathStatus.IN_PROGRESS;
- case SUCCESS:
- return HdfsConstants.StoragePolicySatisfyPathStatus.SUCCESS;
- case FAILURE:
- return HdfsConstants.StoragePolicySatisfyPathStatus.FAILURE;
- case NOT_AVAILABLE:
- return HdfsConstants.StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
- default:
- throw new IllegalArgumentException("Unexpected SPSStatus :" + status);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
index e8e3a58..49ea3f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
@@ -838,28 +838,6 @@ message SatisfyStoragePolicyResponseProto {
}
-message IsInternalSatisfierRunningRequestProto { // no parameters
-}
-
-message IsInternalSatisfierRunningResponseProto {
- required bool running = 1;
-}
-
-message CheckStoragePolicySatisfyPathStatusRequestProto { // no parameters
- required string src = 1;
-}
-
-message CheckStoragePolicySatisfyPathStatusResponseProto {
- enum StoragePolicySatisfyPathStatus {
- PENDING = 0;
- IN_PROGRESS = 1;
- SUCCESS = 2;
- FAILURE = 3;
- NOT_AVAILABLE = 4;
- }
- required StoragePolicySatisfyPathStatus status = 1;
-}
-
service ClientNamenodeProtocol {
rpc getBlockLocations(GetBlockLocationsRequestProto)
returns(GetBlockLocationsResponseProto);
@@ -1048,8 +1026,4 @@ service ClientNamenodeProtocol {
returns(ListOpenFilesResponseProto);
rpc satisfyStoragePolicy(SatisfyStoragePolicyRequestProto)
returns(SatisfyStoragePolicyResponseProto);
- rpc isInternalSatisfierRunning(IsInternalSatisfierRunningRequestProto)
- returns(IsInternalSatisfierRunningResponseProto);
- rpc checkStoragePolicySatisfyPathStatus(CheckStoragePolicySatisfyPathStatusRequestProto)
- returns(CheckStoragePolicySatisfyPathStatusResponseProto);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 36645c9..29f32a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -87,7 +87,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@@ -2498,19 +2497,6 @@ public class RouterRpcServer extends AbstractService
}
@Override
- public boolean isInternalSatisfierRunning() throws IOException {
- checkOperation(OperationCategory.READ, false);
- return false;
- }
-
- @Override
- public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
- String path) throws IOException {
- checkOperation(OperationCategory.READ, false);
- return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
- }
-
- @Override
public Long getNextSPSPath() throws IOException {
checkOperation(OperationCategory.READ, false);
// not supported
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index cf383d0..5ed35b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -639,10 +639,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.storage.policy.satisfier.retry.max.attempts";
public static final int DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT =
3;
- public static final String DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY =
- "dfs.storage.policy.satisfier.low.max-streams.preference";
- public static final boolean DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_DEFAULT =
- true;
public static final String DFS_SPS_MAX_OUTSTANDING_PATHS_KEY =
"dfs.storage.policy.satisfier.max.outstanding.paths";
public static final int DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT = 10000;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index b0816cb..e51529e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -86,8 +85,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Append
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
@@ -162,8 +159,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFile
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
@@ -1865,22 +1860,6 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
@Override
- public IsInternalSatisfierRunningResponseProto
- isInternalSatisfierRunning(RpcController controller,
- IsInternalSatisfierRunningRequestProto req)
- throws ServiceException {
- try {
- boolean ret = server.isInternalSatisfierRunning();
- IsInternalSatisfierRunningResponseProto.Builder builder =
- IsInternalSatisfierRunningResponseProto.newBuilder();
- builder.setRunning(ret);
- return builder.build();
- } catch (IOException e) {
- throw new ServiceException(e);
- }
- }
-
- @Override
public GetQuotaUsageResponseProto getQuotaUsage(
RpcController controller, GetQuotaUsageRequestProto req)
throws ServiceException {
@@ -1925,22 +1904,4 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
return VOID_SATISFYSTORAGEPOLICY_RESPONSE;
}
-
- @Override
- public CheckStoragePolicySatisfyPathStatusResponseProto
- checkStoragePolicySatisfyPathStatus(RpcController controller,
- CheckStoragePolicySatisfyPathStatusRequestProto request)
- throws ServiceException {
- try {
- StoragePolicySatisfyPathStatus status = server
- .checkStoragePolicySatisfyPathStatus(request.getSrc());
- CheckStoragePolicySatisfyPathStatusResponseProto.Builder builder =
- CheckStoragePolicySatisfyPathStatusResponseProto
- .newBuilder();
- builder.setStatus(PBHelperClient.convert(status));
- return builder.build();
- } catch (IOException e) {
- throw new ServiceException(e);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index f51f839..ac01348 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -42,11 +42,9 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBand
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockMovingInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DropSPSWorkCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
@@ -56,11 +54,9 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
@@ -102,8 +98,6 @@ import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.Block
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -111,7 +105,6 @@ import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.DropSPSWorkCommand;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
@@ -143,10 +136,6 @@ public class PBHelper {
private static final RegisterCommandProto REG_CMD_PROTO =
RegisterCommandProto.newBuilder().build();
private static final RegisterCommand REG_CMD = new RegisterCommand();
- private static final DropSPSWorkCommandProto DROP_SPS_WORK_CMD_PROTO =
- DropSPSWorkCommandProto.newBuilder().build();
- private static final DropSPSWorkCommand DROP_SPS_WORK_CMD =
- new DropSPSWorkCommand();
private PBHelper() {
/** Hidden constructor */
@@ -480,10 +469,6 @@ public class PBHelper {
return PBHelper.convert(proto.getBlkIdCmd());
case BlockECReconstructionCommand:
return PBHelper.convert(proto.getBlkECReconstructionCmd());
- case BlockStorageMovementCommand:
- return PBHelper.convert(proto.getBlkStorageMovementCmd());
- case DropSPSWorkCommand:
- return DROP_SPS_WORK_CMD;
default:
return null;
}
@@ -618,15 +603,6 @@ public class PBHelper {
.setBlkECReconstructionCmd(
convert((BlockECReconstructionCommand) datanodeCommand));
break;
- case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
- builder.setCmdType(DatanodeCommandProto.Type.BlockStorageMovementCommand)
- .setBlkStorageMovementCmd(
- convert((BlockStorageMovementCommand) datanodeCommand));
- break;
- case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
- builder.setCmdType(DatanodeCommandProto.Type.DropSPSWorkCommand)
- .setDropSPSWorkCmd(DROP_SPS_WORK_CMD_PROTO);
- break;
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
default:
builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
@@ -1148,79 +1124,4 @@ public class PBHelper {
return new FileRegion(block, providedStorageLocation);
}
-
- private static BlockStorageMovementCommandProto convert(
- BlockStorageMovementCommand blkStorageMovementCmd) {
- BlockStorageMovementCommandProto.Builder builder =
- BlockStorageMovementCommandProto.newBuilder();
-
- builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId());
- Collection<BlockMovingInfo> blockMovingInfos = blkStorageMovementCmd
- .getBlockMovingTasks();
- for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
- builder.addBlockMovingInfo(convertBlockMovingInfo(blkMovingInfo));
- }
- return builder.build();
- }
-
- private static BlockMovingInfoProto convertBlockMovingInfo(
- BlockMovingInfo blkMovingInfo) {
- BlockMovingInfoProto.Builder builder = BlockMovingInfoProto
- .newBuilder();
- builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock()));
-
- DatanodeInfo sourceDnInfo = blkMovingInfo.getSource();
- builder.setSourceDnInfo(PBHelperClient.convert(sourceDnInfo));
-
- DatanodeInfo targetDnInfo = blkMovingInfo.getTarget();
- builder.setTargetDnInfo(PBHelperClient.convert(targetDnInfo));
-
- StorageType sourceStorageType = blkMovingInfo.getSourceStorageType();
- builder.setSourceStorageType(
- PBHelperClient.convertStorageType(sourceStorageType));
-
- StorageType targetStorageType = blkMovingInfo.getTargetStorageType();
- builder.setTargetStorageType(
- PBHelperClient.convertStorageType(targetStorageType));
-
- return builder.build();
- }
-
- private static DatanodeCommand convert(
- BlockStorageMovementCommandProto blkStorageMovementCmdProto) {
- Collection<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
- List<BlockMovingInfoProto> blkSPSatisfyList =
- blkStorageMovementCmdProto.getBlockMovingInfoList();
- for (BlockMovingInfoProto blkSPSatisfy : blkSPSatisfyList) {
- blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy));
- }
- return new BlockStorageMovementCommand(
- DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
- blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos);
- }
-
- private static BlockMovingInfo convertBlockMovingInfo(
- BlockMovingInfoProto blockStorageMovingInfoProto) {
- BlockProto blockProto = blockStorageMovingInfoProto.getBlock();
- Block block = PBHelperClient.convert(blockProto);
-
- DatanodeInfoProto sourceDnInfoProto = blockStorageMovingInfoProto
- .getSourceDnInfo();
- DatanodeInfo sourceDnInfo = PBHelperClient.convert(sourceDnInfoProto);
-
- DatanodeInfoProto targetDnInfoProto = blockStorageMovingInfoProto
- .getTargetDnInfo();
- DatanodeInfo targetDnInfo = PBHelperClient.convert(targetDnInfoProto);
- StorageTypeProto srcStorageTypeProto = blockStorageMovingInfoProto
- .getSourceStorageType();
- StorageType srcStorageType = PBHelperClient
- .convertStorageType(srcStorageTypeProto);
-
- StorageTypeProto targetStorageTypeProto = blockStorageMovingInfoProto
- .getTargetStorageType();
- StorageType targetStorageType = PBHelperClient
- .convertStorageType(targetStorageTypeProto);
- return new BlockMovingInfo(block, sourceDnInfo, targetDnInfo,
- srcStorageType, targetStorageType);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
index 5cf4204..6bf2986 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
@@ -30,8 +30,7 @@ public enum ExitStatus {
IO_EXCEPTION(-4),
ILLEGAL_ARGUMENTS(-5),
INTERRUPTED(-6),
- UNFINALIZED_UPGRADE(-7),
- SKIPPED_DUE_TO_SPS(-8);
+ UNFINALIZED_UPGRADE(-7);
private final int code;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index bb63f2a..87bd155 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -93,7 +93,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -719,9 +718,6 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.close();
pendingReconstruction.stop();
blocksMap.close();
- if (getSPSManager() != null) {
- getSPSManager().stopGracefully();
- }
}
/** @return the datanodeManager */
@@ -3889,21 +3885,6 @@ public class BlockManager implements BlockStatsMXBean {
}
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
-
- // notify SPS about the reported block
- notifyStorageMovementAttemptFinishedBlk(storageInfo, block);
- }
-
- private void notifyStorageMovementAttemptFinishedBlk(
- DatanodeStorageInfo storageInfo, Block block) {
- if (getSPSManager() != null) {
- SPSService sps = getSPSManager().getInternalSPSService();
- if (sps.isRunning()) {
- sps.notifyStorageMovementAttemptFinishedBlk(
- storageInfo.getDatanodeDescriptor(), storageInfo.getStorageType(),
- block);
- }
- }
}
private void processAndHandleReportedBlock(
@@ -5088,7 +5069,7 @@ public class BlockManager implements BlockStatsMXBean {
LOG.info("Storage policy satisfier is disabled");
return false;
}
- spsManager = new StoragePolicySatisfyManager(conf, namesystem, this);
+ spsManager = new StoragePolicySatisfyManager(conf, namesystem);
return true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 24b948c..9c96f16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -207,14 +206,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
private final LightWeightHashSet<Block> invalidateBlocks =
new LightWeightHashSet<>();
- /**
- * A queue of blocks corresponding to trackID for moving its storage
- * placements by this datanode.
- */
- private final BlockQueue<BlockMovingInfo> storageMovementBlocks =
- new BlockQueue<>();
- private volatile boolean dropSPSWork = false;
-
/* Variables for maintaining number of blocks scheduled to be written to
* this storage. This count is approximate and might be slightly bigger
* in case of errors (e.g. datanode does not report if an error occurs
@@ -369,7 +360,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
this.pendingCached.clear();
this.cached.clear();
this.pendingUncached.clear();
- this.storageMovementBlocks.clear();
}
public int numBlocks() {
@@ -1075,62 +1065,4 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
return false;
}
-
- /**
- * Add the block infos which needs to move its storage locations.
- *
- * @param blkMovingInfo
- * - storage mismatched block info
- */
- public void addBlocksToMoveStorage(BlockMovingInfo blkMovingInfo) {
- storageMovementBlocks.offer(blkMovingInfo);
- BlockManager.LOG
- .debug("Adding block move task " + blkMovingInfo + " to " + getName()
- + ", current queue size is " + storageMovementBlocks.size());
- }
-
- /**
- * Return the number of blocks queued up for movement.
- */
- public int getNumberOfBlocksToMoveStorages() {
- return storageMovementBlocks.size();
- }
-
- /**
- * Get the blocks to move to satisfy the storage media type.
- *
- * @param numBlocksToMoveTasks
- * total number of blocks which will be send to this datanode for
- * block movement.
- *
- * @return block infos which needs to move its storage locations or null if
- * there is no block infos to move.
- */
- public BlockMovingInfo[] getBlocksToMoveStorages(int numBlocksToMoveTasks) {
- List<BlockMovingInfo> blockMovingInfos = storageMovementBlocks
- .poll(numBlocksToMoveTasks);
- if (blockMovingInfos == null || blockMovingInfos.size() <= 0) {
- return null;
- }
- BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos
- .size()];
- return blockMovingInfos.toArray(blkMoveArray);
- }
-
- /**
- * Set whether to drop SPS related queues at DN side.
- *
- * @param dropSPSWork
- * - true if need to drop SPS queues, otherwise false.
- */
- public synchronized void setDropSPSWork(boolean dropSPSWork) {
- this.dropSPSWork = dropSPSWork;
- }
-
- /**
- * @return true if need to drop SPS queues at DN.
- */
- public synchronized boolean shouldDropSPSWork() {
- return this.dropSPSWork;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 3542864..4173f48 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
@@ -210,8 +209,6 @@ public class DatanodeManager {
*/
private final long timeBetweenResendingCachingDirectivesMs;
- private final boolean blocksToMoveLowPriority;
-
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
@@ -336,12 +333,6 @@ public class DatanodeManager {
this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY,
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
-
- // SPS configuration to decide blocks to move can share equal ratio of
- // maxtransfers with pending replica and erasure-coded reconstruction tasks
- blocksToMoveLowPriority = conf.getBoolean(
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY,
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_DEFAULT);
}
private static long getStaleIntervalFromConf(Configuration conf,
@@ -1101,19 +1092,6 @@ public class DatanodeManager {
nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
nodeS.setDisallowed(false); // Node is in the include list
- // Sets dropSPSWork flag to true, to ensure that
- // DNA_DROP_SPS_WORK_COMMAND will send to datanode via next heartbeat
- // response immediately after the node registration. This is
- // to avoid a situation, where multiple block attempt finished
- // responses coming from different datanodes. After SPS monitor time
- // out, it will retry the files which were scheduled to the
- // disconnected(for long time more than heartbeat expiry) DN, by
- // finding new datanode. Now, if the expired datanode reconnects back
- // after SPS reschedules, it leads to get different movement attempt
- // finished report from reconnected and newly datanode which is
- // attempting the block movement.
- nodeS.setDropSPSWork(true);
-
// resolve network location
if(this.rejectUnresolvedTopologyDN) {
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
@@ -1691,47 +1669,18 @@ public class DatanodeManager {
final List<DatanodeCommand> cmds = new ArrayList<>();
// Allocate _approximately_ maxTransfers pending tasks to DataNode.
// NN chooses pending tasks based on the ratio between the lengths of
- // replication, erasure-coded block queues and block storage movement
- // queues.
+ // replication and erasure-coded block queues.
int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
- int totalBlocksToMove = nodeinfo.getNumberOfBlocksToMoveStorages();
int totalBlocks = totalReplicateBlocks + totalECBlocks;
- if (totalBlocks > 0 || totalBlocksToMove > 0) {
- int numReplicationTasks = 0;
- int numECTasks = 0;
- int numBlocksToMoveTasks = 0;
- // Check blocksToMoveLowPriority configuration is true/false. If false,
- // then equally sharing the max transfer. Otherwise gives high priority to
- // the pending_replica/erasure-coded tasks and only the delta streams will
- // be used for blocks to move tasks.
- if (!blocksToMoveLowPriority) {
- // add blocksToMove count to total blocks so that will get equal share
- totalBlocks = totalBlocks + totalBlocksToMove;
- numReplicationTasks = (int) Math
- .ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
- numECTasks = (int) Math
- .ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);
- numBlocksToMoveTasks = (int) Math
- .ceil((double) (totalBlocksToMove * maxTransfers) / totalBlocks);
- } else {
- // Calculate the replica and ec tasks, then pick blocksToMove if there
- // is any streams available.
- numReplicationTasks = (int) Math
- .ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
- numECTasks = (int) Math
- .ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);
- int numTasks = numReplicationTasks + numECTasks;
- if (numTasks < maxTransfers) {
- int remainingMaxTransfers = maxTransfers - numTasks;
- numBlocksToMoveTasks = Math.min(totalBlocksToMove,
- remainingMaxTransfers);
- }
- }
+ if (totalBlocks > 0) {
+ int numReplicationTasks = (int) Math.ceil(
+ (double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
+ int numECTasks = (int) Math.ceil(
+ (double) (totalECBlocks * maxTransfers) / totalBlocks);
if (LOG.isDebugEnabled()) {
LOG.debug("Pending replication tasks: " + numReplicationTasks
- + " erasure-coded tasks: " + numECTasks + " blocks to move tasks: "
- + numBlocksToMoveTasks);
+ + " erasure-coded tasks: " + numECTasks);
}
// check pending replication tasks
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
@@ -1747,23 +1696,6 @@ public class DatanodeManager {
cmds.add(new BlockECReconstructionCommand(
DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
}
- // check pending block storage movement tasks
- if (nodeinfo.shouldDropSPSWork()) {
- cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
- // Set back to false to indicate that the new value has been sent to the
- // datanode.
- nodeinfo.setDropSPSWork(false);
- } else {
- // Get pending block storage movement tasks
- BlockMovingInfo[] blkStorageMovementInfos = nodeinfo
- .getBlocksToMoveStorages(numBlocksToMoveTasks);
-
- if (blkStorageMovementInfos != null) {
- cmds.add(new BlockStorageMovementCommand(
- DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, blockPoolId,
- Arrays.asList(blkStorageMovementInfos)));
- }
- }
}
// check block invalidation
@@ -2037,18 +1969,6 @@ public class DatanodeManager {
}
/**
- * Mark all DNs to drop SPS queues. A DNA_DROP_SPS_WORK_COMMAND will be added
- * in heartbeat response, which will indicate DN to drop SPS queues
- */
- public void addDropSPSWorkCommandsToAllDNs() {
- synchronized (this) {
- for (DatanodeDescriptor dn : datanodeMap.values()) {
- dn.setDropSPSWork(true);
- }
- }
- }
-
- /**
* Generates datanode reports for the given report type.
*
* @param type
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 1656b16..a25f6a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -795,16 +795,6 @@ class BPOfferService {
((BlockECReconstructionCommand) cmd).getECTasks();
dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
break;
- case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
- LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT");
- BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) cmd;
- dn.getStoragePolicySatisfyWorker().processBlockMovingTasks(
- blkSPSCmd.getBlockPoolId(), blkSPSCmd.getBlockMovingTasks());
- break;
- case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
- LOG.info("DatanodeCommand action: DNA_DROP_SPS_WORK_COMMAND");
- dn.getStoragePolicySatisfyWorker().dropSPSWork();
- break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
}
@@ -835,8 +825,6 @@ class BPOfferService {
case DatanodeProtocol.DNA_CACHE:
case DatanodeProtocol.DNA_UNCACHE:
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
- case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
- case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
break;
default:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 21af33f..aa044f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -386,7 +386,6 @@ public class DataNode extends ReconfigurableBase
private String dnUserName = null;
private BlockRecoveryWorker blockRecoveryWorker;
private ErasureCodingWorker ecWorker;
- private StoragePolicySatisfyWorker storagePolicySatisfyWorker;
private final Tracer tracer;
private final TracerConfigurationManager tracerConfigurationManager;
private static final int NUM_CORES = Runtime.getRuntime()
@@ -1426,9 +1425,6 @@ public class DataNode extends ReconfigurableBase
ecWorker = new ErasureCodingWorker(getConf(), this);
blockRecoveryWorker = new BlockRecoveryWorker(this);
- storagePolicySatisfyWorker =
- new StoragePolicySatisfyWorker(getConf(), this, null);
- storagePolicySatisfyWorker.start();
blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(getConf());
@@ -1981,10 +1977,6 @@ public class DataNode extends ReconfigurableBase
}
}
- // stop storagePolicySatisfyWorker
- if (storagePolicySatisfyWorker != null) {
- storagePolicySatisfyWorker.stop();
- }
List<BPOfferService> bposArray = (this.blockPoolManager == null)
? new ArrayList<BPOfferService>()
: this.blockPoolManager.getAllNamenodeThreads();
@@ -3624,8 +3616,4 @@ public class DataNode extends ReconfigurableBase
}
return this.diskBalancer;
}
-
- StoragePolicySatisfyWorker getStoragePolicySatisfyWorker() {
- return storagePolicySatisfyWorker;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
deleted file mode 100644
index 0157205..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtilClient;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
-import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
-import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
-import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
-import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Daemon;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * StoragePolicySatisfyWorker handles the storage policy satisfier commands.
- * These commands would be issued from NameNode as part of Datanode's heart beat
- * response. BPOfferService delegates the work to this class for handling
- * BlockStorageMovement commands.
- */
-@InterfaceAudience.Private
-public class StoragePolicySatisfyWorker {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(StoragePolicySatisfyWorker.class);
-
- private final DataNode datanode;
-
- private final int moverThreads;
- private final ExecutorService moveExecutor;
- private final CompletionService<BlockMovementAttemptFinished>
- moverCompletionService;
- private final BlockStorageMovementTracker movementTracker;
- private Daemon movementTrackerThread;
- private final BlockDispatcher blkDispatcher;
-
- public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode,
- BlocksMovementsStatusHandler handler) {
- this.datanode = datanode;
- // Defaulting to 10. This is to minimize the number of move ops.
- moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 10);
- moveExecutor = initializeBlockMoverThreadPool(moverThreads);
- moverCompletionService = new ExecutorCompletionService<>(moveExecutor);
- movementTracker = new BlockStorageMovementTracker(moverCompletionService,
- handler);
- movementTrackerThread = new Daemon(movementTracker);
- movementTrackerThread.setName("BlockStorageMovementTracker");
- DNConf dnConf = datanode.getDnConf();
- int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
- blkDispatcher = new BlockDispatcher(dnConf.getSocketTimeout(),
- ioFileBufferSize, dnConf.getConnectToDnViaHostname());
- }
-
- /**
- * Start StoragePolicySatisfyWorker, which will start block movement tracker
- * thread to track the completion of block movements.
- */
- void start() {
- movementTrackerThread.start();
- }
-
- /**
- * Stop StoragePolicySatisfyWorker, which will terminate executor service and
- * stop block movement tracker thread.
- */
- void stop() {
- movementTracker.stopTracking();
- movementTrackerThread.interrupt();
- moveExecutor.shutdown();
- try {
- moveExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOG.error("Interrupted while waiting for mover thread to terminate", e);
- }
- }
-
- private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) {
- LOG.debug("Block mover to satisfy storage policy; pool threads={}", num);
-
- ThreadPoolExecutor moverThreadPool = new ThreadPoolExecutor(1, num, 60,
- TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
- new Daemon.DaemonFactory() {
- private final AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = super.newThread(r);
- t.setName("BlockMoverTask-" + threadIndex.getAndIncrement());
- return t;
- }
- });
-
- moverThreadPool.allowCoreThreadTimeOut(true);
- return moverThreadPool;
- }
-
- /**
- * Handles the given set of block movement tasks. This will iterate over the
- * block movement list and submit each block movement task asynchronously in a
- * separate thread. Each task will move the block replica to the target node &
- * wait for the completion.
- *
- * @param blockPoolID block pool identifier
- *
- * @param blockMovingInfos
- * list of blocks to be moved
- */
- public void processBlockMovingTasks(final String blockPoolID,
- final Collection<BlockMovingInfo> blockMovingInfos) {
- LOG.debug("Received BlockMovingTasks {}", blockMovingInfos);
- for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
- StorageType sourceStorageType = blkMovingInfo.getSourceStorageType();
- StorageType targetStorageType = blkMovingInfo.getTargetStorageType();
- assert sourceStorageType != targetStorageType
- : "Source and Target storage type shouldn't be same!";
- BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID,
- blkMovingInfo);
- moverCompletionService.submit(blockMovingTask);
- }
- }
-
- /**
- * This class encapsulates the process of moving the block replica to the
- * given target and wait for the response.
- */
- private class BlockMovingTask implements
- Callable<BlockMovementAttemptFinished> {
- private final String blockPoolID;
- private final BlockMovingInfo blkMovingInfo;
-
- BlockMovingTask(String blockPoolID, BlockMovingInfo blkMovInfo) {
- this.blockPoolID = blockPoolID;
- this.blkMovingInfo = blkMovInfo;
- }
-
- @Override
- public BlockMovementAttemptFinished call() {
- BlockMovementStatus status = moveBlock();
- return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
- blkMovingInfo.getSource(), blkMovingInfo.getTarget(),
- blkMovingInfo.getTargetStorageType(), status);
- }
-
- private BlockMovementStatus moveBlock() {
- datanode.incrementXmitsInProgress();
- ExtendedBlock eb = new ExtendedBlock(blockPoolID,
- blkMovingInfo.getBlock());
- try {
- Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
- eb, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
- new StorageType[]{blkMovingInfo.getTargetStorageType()},
- new String[0]);
- DataEncryptionKeyFactory keyFactory = datanode
- .getDataEncryptionKeyFactoryForBlock(eb);
-
- return blkDispatcher.moveBlock(blkMovingInfo,
- datanode.getSaslClient(), eb, datanode.newSocket(),
- keyFactory, accessToken);
- } catch (IOException e) {
- // TODO: handle failure retries
- LOG.warn(
- "Failed to move block:{} from src:{} to destin:{} to satisfy "
- + "storageType:{}",
- blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
- blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
- return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
- } finally {
- datanode.decrementXmitsInProgress();
- }
- }
- }
-
- /**
- * Drop the in-progress SPS work queues.
- */
- public void dropSPSWork() {
- LOG.info("Received request to drop StoragePolicySatisfierWorker queues. "
- + "So, none of the SPS Worker queued block movements will"
- + " be scheduled.");
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index af5ab2d..c7a53e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -48,8 +48,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.SecurityUtil;
@@ -658,25 +656,6 @@ public class Mover {
final Mover m = new Mover(nnc, conf, retryCount,
excludedPinnedBlocks);
- boolean spsRunning;
- try {
- spsRunning = nnc.getDistributedFileSystem().getClient()
- .isInternalSatisfierRunning();
- } catch (RemoteException e) {
- IOException cause = e.unwrapRemoteException();
- if (cause instanceof StandbyException) {
- System.err.println("Skip Standby Namenode. " + nnc.toString());
- continue;
- }
- throw e;
- }
- if (spsRunning) {
- System.err.println("Mover failed due to StoragePolicySatisfier"
- + " service running inside namenode. Exiting with status "
- + ExitStatus.SKIPPED_DUE_TO_SPS + "... ");
- return ExitStatus.SKIPPED_DUE_TO_SPS.getExitCode();
- }
-
final ExitStatus r = m.run();
if (r == ExitStatus.SUCCESS) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index 3b68979..1cb414d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.security.AccessControlException;
import java.io.FileNotFoundException;
@@ -207,17 +206,6 @@ class FSDirXAttrOp {
List<XAttr> newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove,
removedXAttrs);
if (existingXAttrs.size() != newXAttrs.size()) {
- for (XAttr xattr : toRemove) {
- if (XATTR_SATISFY_STORAGE_POLICY
- .equals(XAttrHelper.getPrefixedName(xattr))) {
- StoragePolicySatisfyManager spsManager =
- fsd.getBlockManager().getSPSManager();
- if (spsManager != null) {
- spsManager.getInternalSPSService().clearQueue(inode.getId());
- }
- break;
- }
- }
XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
return removedXAttrs;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index afe9092..7bc9ecc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -209,7 +209,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -1363,9 +1362,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// Don't want to keep replication queues when not in Active.
blockManager.clearQueues();
blockManager.setInitializedReplQueues(false);
- if (blockManager.getSPSManager() != null) {
- blockManager.getSPSManager().stopGracefully();
- }
}
} finally {
writeUnlock("stopActiveServices");
@@ -2275,9 +2271,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
// checks sps status
boolean disabled = (blockManager.getSPSManager() == null);
- if (disabled || (blockManager
- .getSPSManager().getMode() == StoragePolicySatisfierMode.INTERNAL
- && !blockManager.getSPSManager().isInternalSatisfierRunning())) {
+ if (disabled) {
throw new UnsupportedActionException(
"Cannot request to satisfy storage policy "
+ "when storage policy satisfier feature has been disabled"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 2f3325f..318f801 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -111,7 +111,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -2534,41 +2533,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
@Override
- public boolean isInternalSatisfierRunning() throws IOException {
- checkNNStartup();
- String operationName = "isInternalSatisfierRunning";
- namesystem.checkSuperuserPrivilege(operationName);
- if (nn.isStandbyState()) {
- throw new StandbyException("Not supported by Standby Namenode.");
- }
- StoragePolicySatisfyManager spsMgr =
- namesystem.getBlockManager().getSPSManager();
- boolean isInternalSatisfierRunning = (spsMgr != null
- ? spsMgr.isInternalSatisfierRunning() : false);
- namesystem.logAuditEvent(true, operationName, null);
- return isInternalSatisfierRunning;
- }
-
- @Override
- public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
- String path) throws IOException {
- checkNNStartup();
- if (nn.isStandbyState()) {
- throw new StandbyException("Not supported by Standby Namenode.");
- }
- if (namesystem.getBlockManager().getSPSManager() == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Satisfier is not running inside namenode, so status "
- + "can't be returned.");
- }
- throw new IOException("Satisfier is not running inside namenode, "
- + "so status can't be returned.");
- }
- return namesystem.getBlockManager().getSPSManager()
- .checkStoragePolicySatisfyPathStatus(path);
- }
-
- @Override
public Long getNextSPSPath() throws IOException {
checkNNStartup();
String operationName = "getNextSPSPath";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index c95dcda..b990bc5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -23,14 +23,10 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,9 +56,6 @@ public class BlockStorageMovementNeeded {
private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
new HashMap<>();
- private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
- new ConcurrentHashMap<>();
-
private final Context ctxt;
private Daemon pathIdCollector;
@@ -86,9 +79,6 @@ public class BlockStorageMovementNeeded {
* - track info for satisfy the policy
*/
public synchronized void add(ItemInfo trackInfo) {
- spsStatus.put(trackInfo.getFile(),
- new StoragePolicySatisfyPathStatusInfo(
- StoragePolicySatisfyPathStatus.IN_PROGRESS));
storageMovementNeeded.add(trackInfo);
}
@@ -129,7 +119,7 @@ public class BlockStorageMovementNeeded {
if (itemInfo.getStartPath() == itemInfo.getFile()) {
return;
}
- updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted);
+ updatePendingDirScanStats(itemInfo.getStartPath(), 1, scanCompleted);
}
private void updatePendingDirScanStats(long startPath, int numScannedFiles,
@@ -181,7 +171,6 @@ public class BlockStorageMovementNeeded {
if (!ctxt.isFileExist(startId)) {
// directory deleted just remove it.
this.pendingWorkForDirectory.remove(startId);
- updateStatus(startId, isSuccess);
} else {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
if (pendingWork != null) {
@@ -189,17 +178,13 @@ public class BlockStorageMovementNeeded {
if (pendingWork.isDirWorkDone()) {
ctxt.removeSPSHint(startId);
pendingWorkForDirectory.remove(startId);
- pendingWork.setFailure(!isSuccess);
- updateStatus(startId, pendingWork.isPolicySatisfied());
}
- pendingWork.setFailure(isSuccess);
}
}
} else {
// Remove xAttr if trackID doesn't exist in
// storageMovementAttemptedItems or file policy satisfied.
ctxt.removeSPSHint(trackInfo.getFile());
- updateStatus(trackInfo.getFile(), isSuccess);
}
}
@@ -216,24 +201,6 @@ public class BlockStorageMovementNeeded {
}
/**
- * Mark inode status as SUCCESS in map.
- */
- private void updateStatus(long startId, boolean isSuccess){
- StoragePolicySatisfyPathStatusInfo spsStatusInfo =
- spsStatus.get(startId);
- if (spsStatusInfo == null) {
- spsStatusInfo = new StoragePolicySatisfyPathStatusInfo();
- spsStatus.put(startId, spsStatusInfo);
- }
-
- if (isSuccess) {
- spsStatusInfo.setSuccess();
- } else {
- spsStatusInfo.setFailure();
- }
- }
-
- /**
* Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded
* and notify to clean up required resources.
* @throws IOException
@@ -277,7 +244,6 @@ public class BlockStorageMovementNeeded {
@Override
public void run() {
LOG.info("Starting SPSPathIdProcessor!.");
- long lastStatusCleanTime = 0;
Long startINode = null;
while (ctxt.isRunning()) {
try {
@@ -289,9 +255,6 @@ public class BlockStorageMovementNeeded {
// Waiting for SPS path
Thread.sleep(3000);
} else {
- spsStatus.put(startINode,
- new StoragePolicySatisfyPathStatusInfo(
- StoragePolicySatisfyPathStatus.IN_PROGRESS));
ctxt.scanAndCollectFiles(startINode);
// check if directory was empty and no child added to queue
DirPendingWorkInfo dirPendingWorkInfo =
@@ -300,15 +263,8 @@ public class BlockStorageMovementNeeded {
&& dirPendingWorkInfo.isDirWorkDone()) {
ctxt.removeSPSHint(startINode);
pendingWorkForDirectory.remove(startINode);
- updateStatus(startINode, true);
}
}
- //Clear the SPS status if status is in SUCCESS more than 5 min.
- if (Time.monotonicNow()
- - lastStatusCleanTime > statusClearanceElapsedTimeMs) {
- lastStatusCleanTime = Time.monotonicNow();
- cleanSPSStatus();
- }
startINode = null; // Current inode successfully scanned.
}
} catch (Throwable t) {
@@ -328,16 +284,6 @@ public class BlockStorageMovementNeeded {
}
}
}
-
- private synchronized void cleanSPSStatus() {
- for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
- spsStatus.entrySet().iterator(); it.hasNext();) {
- Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();
- if (entry.getValue().canRemove()) {
- it.remove();
- }
- }
- }
}
/**
@@ -347,7 +293,6 @@ public class BlockStorageMovementNeeded {
private int pendingWorkCount = 0;
private boolean fullyScanned = false;
- private boolean success = true;
/**
* Increment the pending work count for directory.
@@ -378,20 +323,6 @@ public class BlockStorageMovementNeeded {
public synchronized void markScanCompleted() {
this.fullyScanned = true;
}
-
- /**
- * Return true if all the files block movement is success, otherwise false.
- */
- public boolean isPolicySatisfied() {
- return success;
- }
-
- /**
- * Set directory SPS status failed.
- */
- public void setFailure(boolean failure) {
- this.success = this.success || failure;
- }
}
public void activate() {
@@ -406,56 +337,6 @@ public class BlockStorageMovementNeeded {
}
}
- /**
- * Represent the file/directory block movement status.
- */
- static class StoragePolicySatisfyPathStatusInfo {
- private StoragePolicySatisfyPathStatus status =
- StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
- private long lastStatusUpdateTime;
-
- StoragePolicySatisfyPathStatusInfo() {
- this.lastStatusUpdateTime = 0;
- }
-
- StoragePolicySatisfyPathStatusInfo(StoragePolicySatisfyPathStatus status) {
- this.status = status;
- this.lastStatusUpdateTime = 0;
- }
-
- private void setSuccess() {
- this.status = StoragePolicySatisfyPathStatus.SUCCESS;
- this.lastStatusUpdateTime = Time.monotonicNow();
- }
-
- private void setFailure() {
- this.status = StoragePolicySatisfyPathStatus.FAILURE;
- this.lastStatusUpdateTime = Time.monotonicNow();
- }
-
- private StoragePolicySatisfyPathStatus getStatus() {
- return status;
- }
-
- /**
- * Return true if SUCCESS status cached more then 5 min.
- */
- private boolean canRemove() {
- return (StoragePolicySatisfyPathStatus.SUCCESS == status
- || StoragePolicySatisfyPathStatus.FAILURE == status)
- && (Time.monotonicNow()
- - lastStatusUpdateTime) > statusClearanceElapsedTimeMs;
- }
- }
-
- public StoragePolicySatisfyPathStatus getStatus(long id) {
- StoragePolicySatisfyPathStatusInfo spsStatusInfo = spsStatus.get(id);
- if(spsStatusInfo == null){
- return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
- }
- return spsStatusInfo.getStatus();
- }
-
@VisibleForTesting
public static void setStatusClearanceElapsedTimeMs(
long statusClearanceElapsedTimeMs) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f142918/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
index d538374..afa5a50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -94,11 +94,6 @@ public interface Context {
BlockStoragePolicy getStoragePolicy(byte policyId);
/**
- * Drop the SPS work in case if any previous work queued up.
- */
- void addDropPreviousSPSWorkAtDNs();
-
- /**
* Remove the hint which was added to track SPS call.
*
* @param spsPath
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org