You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/30 20:22:37 UTC
[03/52] [abbrv] hadoop git commit: Merge remote-tracking branch
'apache/trunk' into HDFS-7285
Merge remote-tracking branch 'apache/trunk' into HDFS-7285
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab56fcdb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab56fcdb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab56fcdb
Branch: refs/heads/HDFS-7240
Commit: ab56fcdb1219d03713b408dd3a95d7405635254d
Parents: 164cbe6 cbb2495
Author: Zhe Zhang <zh...@cloudera.com>
Authored: Thu Aug 27 16:23:41 2015 -0700
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Tue Sep 1 14:30:25 2015 -0700
----------------------------------------------------------------------
.../server/AuthenticationFilter.java | 63 +-
.../server/AuthenticationToken.java | 12 +
.../security/authentication/util/AuthToken.java | 35 +-
.../server/TestAuthenticationFilter.java | 163 ++-
hadoop-common-project/hadoop-common/CHANGES.txt | 34 +
.../src/main/conf/log4j.properties | 13 +
.../fs/CommonConfigurationKeysPublic.java | 5 +
.../java/org/apache/hadoop/fs/CreateFlag.java | 2 +-
.../apache/hadoop/fs/TrashPolicyDefault.java | 11 +-
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 5 +-
.../main/java/org/apache/hadoop/ipc/Server.java | 60 +
.../apache/hadoop/ipc/WritableRpcEngine.java | 3 +
.../apache/hadoop/ipc/metrics/RpcMetrics.java | 48 +
.../apache/hadoop/metrics2/lib/MutableStat.java | 7 +-
.../org/apache/hadoop/metrics2/util/MBeans.java | 37 +-
.../org/apache/hadoop/util/HostsFileReader.java | 7 +-
.../main/java/org/apache/hadoop/util/Shell.java | 11 +-
.../org/apache/hadoop/util/StringUtils.java | 29 +-
.../src/main/resources/core-default.xml | 9 +
.../src/site/markdown/HttpAuthentication.md | 8 +-
.../hadoop-common/src/site/markdown/Metrics.md | 2 +
.../src/site/markdown/SingleCluster.md.vm | 2 +-
.../org/apache/hadoop/ipc/TestProtoBufRpc.java | 77 +-
.../org/apache/hadoop/test/MetricsAsserts.java | 2 +-
.../java/org/apache/hadoop/util/TestShell.java | 39 +
.../hadoop-common/src/test/proto/test.proto | 7 +
.../src/test/proto/test_rpc_service.proto | 1 +
.../dev-support/findbugsExcludeFile.xml | 10 +
.../org/apache/hadoop/hdfs/DFSUtilClient.java | 26 +
.../org/apache/hadoop/hdfs/ExtendedBlockId.java | 82 ++
.../org/apache/hadoop/hdfs/ReplicaAccessor.java | 88 ++
.../hadoop/hdfs/ReplicaAccessorBuilder.java | 101 ++
.../hdfs/client/HdfsClientConfigKeys.java | 76 +-
.../hadoop/hdfs/client/impl/DfsClientConf.java | 794 +++++++++++++
.../hadoop/hdfs/client/impl/package-info.java | 18 +
.../org/apache/hadoop/hdfs/net/DomainPeer.java | 132 +++
.../java/org/apache/hadoop/hdfs/net/Peer.java | 123 ++
.../hadoop/hdfs/protocol/HdfsConstants.java | 7 +
.../datatransfer/BlockConstructionStage.java | 62 +
.../datatransfer/DataTransferProtoUtil.java | 146 +++
.../datatransfer/DataTransferProtocol.java | 202 ++++
.../hadoop/hdfs/protocol/datatransfer/Op.java | 66 ++
.../hdfs/protocol/datatransfer/Sender.java | 261 +++++
.../hadoop/hdfs/protocolPB/PBHelperClient.java | 254 +++++
.../token/block/InvalidBlockTokenException.java | 41 +
.../server/datanode/BlockMetadataHeader.java | 209 ++++
.../hdfs/server/datanode/CachingStrategy.java | 76 ++
.../hadoop/hdfs/shortcircuit/ClientMmap.java | 75 ++
.../hadoop/hdfs/shortcircuit/DfsClientShm.java | 119 ++
.../hdfs/shortcircuit/DfsClientShmManager.java | 522 +++++++++
.../hdfs/shortcircuit/DomainSocketFactory.java | 196 ++++
.../hdfs/shortcircuit/ShortCircuitCache.java | 1066 +++++++++++++++++
.../hdfs/shortcircuit/ShortCircuitReplica.java | 352 ++++++
.../shortcircuit/ShortCircuitReplicaInfo.java | 64 ++
.../hdfs/shortcircuit/ShortCircuitShm.java | 647 +++++++++++
.../hadoop/hdfs/util/ByteArrayManager.java | 422 +++++++
.../hadoop/hdfs/util/ExactSizeInputStream.java | 125 ++
.../apache/hadoop/hdfs/util/IOUtilsClient.java | 46 +
.../apache/hadoop/hdfs/util/package-info.java | 18 +
.../hadoop/hdfs/web/WebHdfsFileSystem.java | 20 +
.../hdfs/web/resources/CreateFlagParam.java | 48 +
.../hdfs/web/resources/CreateParentParam.java | 2 +-
.../src/main/proto/ClientDatanodeProtocol.proto | 33 -
.../src/main/proto/datatransfer.proto | 4 +
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 125 +-
.../dev-support/findbugsExcludeFile.xml | 10 -
.../hadoop-hdfs/src/CMakeLists.txt | 1 +
.../apache/hadoop/fs/BlockStorageLocation.java | 52 -
.../java/org/apache/hadoop/fs/HdfsVolumeId.java | 73 --
.../java/org/apache/hadoop/fs/VolumeId.java | 40 -
.../apache/hadoop/hdfs/BlockReaderFactory.java | 65 +-
.../hadoop/hdfs/BlockStorageLocationUtil.java | 368 ------
.../org/apache/hadoop/hdfs/ClientContext.java | 5 +-
.../java/org/apache/hadoop/hdfs/DFSClient.java | 134 +--
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 186 ++-
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 3 +-
.../org/apache/hadoop/hdfs/DataStreamer.java | 6 +-
.../hadoop/hdfs/DistributedFileSystem.java | 39 -
.../org/apache/hadoop/hdfs/ExtendedBlockId.java | 82 --
.../apache/hadoop/hdfs/ExternalBlockReader.java | 126 +++
.../apache/hadoop/hdfs/HdfsConfiguration.java | 8 +-
.../apache/hadoop/hdfs/RemoteBlockReader.java | 6 +-
.../apache/hadoop/hdfs/RemoteBlockReader2.java | 6 +-
.../org/apache/hadoop/hdfs/XAttrHelper.java | 13 +-
.../hadoop/hdfs/client/impl/DfsClientConf.java | 765 -------------
.../org/apache/hadoop/hdfs/net/DomainPeer.java | 132 ---
.../java/org/apache/hadoop/hdfs/net/Peer.java | 123 --
.../hdfs/protocol/ClientDatanodeProtocol.java | 19 -
.../hdfs/protocol/HdfsBlocksMetadata.java | 111 --
.../datatransfer/BlockConstructionStage.java | 62 -
.../datatransfer/DataTransferProtoUtil.java | 148 ---
.../datatransfer/DataTransferProtocol.java | 201 ----
.../hadoop/hdfs/protocol/datatransfer/Op.java | 65 --
.../hdfs/protocol/datatransfer/PipelineAck.java | 2 +-
.../hdfs/protocol/datatransfer/Receiver.java | 8 +-
.../hdfs/protocol/datatransfer/Sender.java | 261 -----
.../datatransfer/sasl/DataTransferSaslUtil.java | 2 +-
...tDatanodeProtocolServerSideTranslatorPB.java | 43 +-
.../ClientDatanodeProtocolTranslatorPB.java | 49 +-
...tNamenodeProtocolServerSideTranslatorPB.java | 6 +-
.../ClientNamenodeProtocolTranslatorPB.java | 28 +-
.../DatanodeProtocolClientSideTranslatorPB.java | 4 +-
.../InterDatanodeProtocolTranslatorPB.java | 2 +-
.../NamenodeProtocolTranslatorPB.java | 2 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 237 +---
.../token/block/InvalidBlockTokenException.java | 41 -
.../hadoop/hdfs/server/balancer/Balancer.java | 133 ++-
.../hadoop/hdfs/server/balancer/Dispatcher.java | 46 +-
.../server/blockmanagement/BlockCollection.java | 9 +-
.../hdfs/server/blockmanagement/BlockInfo.java | 59 +-
.../blockmanagement/BlockInfoContiguous.java | 11 -
.../blockmanagement/BlockInfoStriped.java | 5 -
.../server/blockmanagement/BlockManager.java | 115 +-
.../BlockPlacementPolicyDefault.java | 177 +--
.../BlockPlacementPolicyWithNodeGroup.java | 35 +-
.../BlockStoragePolicySuite.java | 5 +-
.../BlockUnderConstructionFeature.java | 31 +-
.../hdfs/server/blockmanagement/BlocksMap.java | 14 +-
.../blockmanagement/DatanodeDescriptor.java | 26 +-
.../server/blockmanagement/DatanodeManager.java | 3 +-
.../blockmanagement/DecommissionManager.java | 21 +-
.../SequentialBlockGroupIdGenerator.java | 5 +-
.../SequentialBlockIdGenerator.java | 5 +-
.../hdfs/server/common/HdfsServerConstants.java | 6 -
.../server/datanode/BlockMetadataHeader.java | 211 ----
.../hdfs/server/datanode/CachingStrategy.java | 76 --
.../hadoop/hdfs/server/datanode/DNConf.java | 17 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 44 +-
.../hdfs/server/datanode/DataXceiver.java | 23 +-
.../server/datanode/SecureDataNodeStarter.java | 4 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 13 -
.../datanode/fsdataset/impl/BlockPoolSlice.java | 2 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 64 +-
.../impl/RamDiskAsyncLazyPersistService.java | 8 +-
.../datanode/web/webhdfs/ParameterParser.java | 14 +
.../datanode/web/webhdfs/WebHdfsHandler.java | 23 +-
.../hdfs/server/namenode/BackupImage.java | 8 +-
.../namenode/ErasureCodingZoneManager.java | 2 +-
.../hdfs/server/namenode/FSDirAppendOp.java | 2 +-
.../hdfs/server/namenode/FSDirAttrOp.java | 49 +-
.../hdfs/server/namenode/FSDirConcatOp.java | 4 +-
.../hdfs/server/namenode/FSDirDeleteOp.java | 5 +-
.../hdfs/server/namenode/FSDirRenameOp.java | 7 +-
.../hdfs/server/namenode/FSDirSnapshotOp.java | 2 +
.../hdfs/server/namenode/FSDirTruncateOp.java | 2 +-
.../hdfs/server/namenode/FSDirWriteFileOp.java | 3 +-
.../hdfs/server/namenode/FSDirXAttrOp.java | 29 +-
.../hdfs/server/namenode/FSDirectory.java | 77 +-
.../hadoop/hdfs/server/namenode/FSEditLog.java | 11 +
.../hdfs/server/namenode/FSEditLogLoader.java | 8 +-
.../hadoop/hdfs/server/namenode/FSImage.java | 153 ++-
.../server/namenode/FSImageFormatPBINode.java | 11 +-
.../server/namenode/FSImageSerialization.java | 4 +-
.../hdfs/server/namenode/FSNamesystem.java | 70 +-
.../namenode/FileUnderConstructionFeature.java | 2 +-
.../hadoop/hdfs/server/namenode/INode.java | 52 +-
.../hdfs/server/namenode/INodeDirectory.java | 11 +-
.../hadoop/hdfs/server/namenode/INodeFile.java | 34 +-
.../hadoop/hdfs/server/namenode/INodeId.java | 1 +
.../hadoop/hdfs/server/namenode/NameNode.java | 176 ++-
.../hdfs/server/namenode/NamenodeFsck.java | 20 +-
.../hadoop/hdfs/server/namenode/Namesystem.java | 7 +-
.../hdfs/server/namenode/QuotaCounts.java | 10 +-
.../server/namenode/SerialNumberManager.java | 44 -
.../hdfs/server/namenode/SerialNumberMap.java | 79 ++
.../hdfs/server/namenode/XAttrFeature.java | 78 +-
.../hdfs/server/namenode/XAttrFormat.java | 161 +++
.../server/namenode/XAttrPermissionFilter.java | 6 +-
.../hdfs/server/namenode/XAttrStorage.java | 62 +-
.../namenode/metrics/FSNamesystemMBean.java | 10 +
.../snapshot/FSImageFormatPBSnapshot.java | 10 +-
.../snapshot/FileWithSnapshotFeature.java | 44 +-
.../web/resources/NamenodeWebHdfsMethods.java | 20 +-
.../hdfs/server/protocol/NamespaceInfo.java | 17 +-
.../hadoop/hdfs/shortcircuit/ClientMmap.java | 75 --
.../hadoop/hdfs/shortcircuit/DfsClientShm.java | 119 --
.../hdfs/shortcircuit/DfsClientShmManager.java | 514 ---------
.../hdfs/shortcircuit/DomainSocketFactory.java | 194 ----
.../hdfs/shortcircuit/ShortCircuitCache.java | 1068 ------------------
.../hdfs/shortcircuit/ShortCircuitReplica.java | 349 ------
.../shortcircuit/ShortCircuitReplicaInfo.java | 64 --
.../hdfs/shortcircuit/ShortCircuitShm.java | 646 -----------
.../org/apache/hadoop/hdfs/tools/DFSAdmin.java | 5 +
.../hadoop/hdfs/util/ByteArrayManager.java | 418 -------
.../hadoop/hdfs/util/ExactSizeInputStream.java | 125 --
.../hadoop/hdfs/util/LightWeightHashSet.java | 21 +-
.../org/apache/hadoop/hdfs/web/JsonUtil.java | 6 +-
.../src/main/resources/hdfs-default.xml | 50 +-
.../hadoop-hdfs/src/site/markdown/HdfsDesign.md | 5 +-
.../hadoop-hdfs/src/site/markdown/WebHDFS.md | 21 +-
.../org/apache/hadoop/hdfs/TestFiPipelines.java | 9 +-
.../datanode/TestFiDataTransferProtocol.java | 3 +-
.../datanode/TestFiDataTransferProtocol2.java | 5 +-
.../hadoop/fs/TestEnhancedByteBufferAccess.java | 10 +-
.../java/org/apache/hadoop/fs/TestUnbuffer.java | 7 +-
.../java/org/apache/hadoop/fs/TestVolumeId.java | 146 ---
.../fs/viewfs/TestViewFsDefaultValue.java | 8 +-
.../apache/hadoop/hdfs/BlockReaderTestUtil.java | 6 +-
.../org/apache/hadoop/hdfs/FileAppendTest4.java | 5 +-
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 1 +
.../hadoop/hdfs/TestAppendSnapshotTruncate.java | 13 +-
.../hadoop/hdfs/TestBlockReaderFactory.java | 4 +-
.../hadoop/hdfs/TestBlockReaderLocal.java | 4 +-
.../hadoop/hdfs/TestBlockReaderLocalLegacy.java | 6 +-
.../TestClientProtocolForPipelineRecovery.java | 4 +-
.../org/apache/hadoop/hdfs/TestConnCache.java | 5 +-
.../hadoop/hdfs/TestDFSClientRetries.java | 8 +-
.../hdfs/TestDFSInotifyEventInputStream.java | 2 +-
.../apache/hadoop/hdfs/TestDFSInputStream.java | 2 +-
.../org/apache/hadoop/hdfs/TestDFSUpgrade.java | 78 +-
.../hadoop/hdfs/TestDFSUpgradeFromImage.java | 107 +-
.../hadoop/hdfs/TestDataTransferKeepalive.java | 8 +-
.../hadoop/hdfs/TestDataTransferProtocol.java | 8 +-
.../apache/hadoop/hdfs/TestDatanodeDeath.java | 5 +-
.../hadoop/hdfs/TestDisableConnCache.java | 3 +-
.../hadoop/hdfs/TestDistributedFileSystem.java | 291 +----
.../hadoop/hdfs/TestExternalBlockReader.java | 298 +++++
.../org/apache/hadoop/hdfs/TestFileAppend.java | 4 +-
.../org/apache/hadoop/hdfs/TestFileAppend2.java | 6 +-
.../org/apache/hadoop/hdfs/TestFileAppend4.java | 5 +-
.../apache/hadoop/hdfs/TestFileCreation.java | 165 +--
.../java/org/apache/hadoop/hdfs/TestHFlush.java | 3 +-
.../apache/hadoop/hdfs/TestParallelRead.java | 2 +-
.../TestParallelShortCircuitLegacyRead.java | 4 +-
.../TestParallelShortCircuitReadUnCached.java | 6 +-
.../hadoop/hdfs/TestParallelUnixDomainRead.java | 2 +-
.../org/apache/hadoop/hdfs/TestPipelines.java | 9 +-
.../java/org/apache/hadoop/hdfs/TestPread.java | 6 +-
.../java/org/apache/hadoop/hdfs/TestRead.java | 5 +-
.../hadoop/hdfs/TestRemoteBlockReader.java | 4 +-
.../hdfs/TestReplaceDatanodeOnFailure.java | 4 +-
.../hdfs/protocol/TestBlockListAsLongs.java | 9 +-
.../hadoop/hdfs/protocolPB/TestPBHelper.java | 20 +-
.../hdfs/server/balancer/TestBalancer.java | 166 ++-
.../server/blockmanagement/TestBlockInfo.java | 17 +-
.../blockmanagement/TestBlockInfoStriped.java | 4 +-
.../TestBlockInfoUnderConstruction.java | 80 --
.../blockmanagement/TestBlockManager.java | 18 +-
.../TestBlockReportRateLimiting.java | 2 -
.../blockmanagement/TestBlockTokenWithDFS.java | 6 +-
.../TestBlockUnderConstructionFeature.java | 80 ++
.../blockmanagement/TestPendingReplication.java | 1 -
.../blockmanagement/TestReplicationPolicy.java | 94 +-
.../server/datanode/SimulatedFSDataset.java | 7 -
.../server/datanode/TestBlockReplacement.java | 7 +-
.../server/datanode/TestCachingStrategy.java | 18 +-
.../datanode/TestDataNodeVolumeFailure.java | 6 +-
.../extdataset/ExternalDatasetImpl.java | 6 -
.../fsdataset/impl/LazyPersistTestCase.java | 5 +-
.../fsdataset/impl/TestDatanodeRestart.java | 7 +-
.../TestCommitBlockSynchronization.java | 9 +-
.../TestDefaultBlockPlacementPolicy.java | 49 +-
.../namenode/TestDiskspaceQuotaUpdate.java | 64 ++
.../namenode/TestFSImageWithSnapshot.java | 4 +-
.../server/namenode/TestFSNamesystemMBean.java | 34 +-
.../hadoop/hdfs/server/namenode/TestFsck.java | 56 +-
.../hdfs/server/namenode/TestINodeFile.java | 7 +-
.../namenode/TestNameNodeMetricsLogger.java | 193 ++++
.../hdfs/server/namenode/TestStartup.java | 27 +-
.../hdfs/server/namenode/TestXAttrFeature.java | 119 ++
.../namenode/snapshot/SnapshotTestHelper.java | 4 +-
.../snapshot/TestFileWithSnapshotFeature.java | 7 +-
.../snapshot/TestSnapshotBlocksMap.java | 30 +-
.../namenode/snapshot/TestSnapshotDeletion.java | 25 +-
.../snapshot/TestSnapshotReplication.java | 31 +-
.../shortcircuit/TestShortCircuitCache.java | 16 +-
.../shortcircuit/TestShortCircuitLocalRead.java | 106 +-
.../hadoop/hdfs/tools/TestDFSAdminWithHA.java | 7 +
.../hdfs/util/TestLightWeightHashSet.java | 29 +-
.../org/apache/hadoop/hdfs/web/TestWebHDFS.java | 25 +
.../src/test/resources/hadoop-252-dfs-dir.tgz | Bin 0 -> 14112 bytes
.../src/test/resources/log4j.properties | 13 +
hadoop-mapreduce-project/CHANGES.txt | 23 +
.../apache/hadoop/mapred/LocalJobRunner.java | 27 +
.../mapreduce/lib/output/MultipleOutputs.java | 14 +-
.../hadoop/mapred/ResourceMgrDelegate.java | 7 +
.../hadoop/mapred/TestClientRedirect.java | 9 +
.../hadoop/mapred/TestLocalJobSubmission.java | 25 +
hadoop-project/pom.xml | 4 +-
.../org/apache/hadoop/tools/CopyListing.java | 15 +-
.../java/org/apache/hadoop/tools/DiffInfo.java | 32 +-
.../java/org/apache/hadoop/tools/DistCp.java | 27 +-
.../org/apache/hadoop/tools/DistCpOptions.java | 4 +-
.../org/apache/hadoop/tools/DistCpSync.java | 308 ++++-
.../apache/hadoop/tools/SimpleCopyListing.java | 151 ++-
.../org/apache/hadoop/tools/TestDistCpSync.java | 345 +++++-
.../apache/hadoop/tools/TestOptionsParser.java | 22 +-
.../hadoop/yarn/sls/nodemanager/NodeInfo.java | 3 +
.../yarn/sls/scheduler/RMNodeWrapper.java | 5 +
hadoop-yarn-project/CHANGES.txt | 48 +-
.../yarn/api/ApplicationClientProtocol.java | 18 +
.../UpdateApplicationPriorityRequest.java | 80 ++
.../UpdateApplicationPriorityResponse.java | 47 +
.../yarn/api/records/LogAggregationContext.java | 95 ++
.../hadoop/yarn/conf/YarnConfiguration.java | 36 +
.../api/ContainerLogAggregationPolicy.java | 54 +
.../yarn/server/api/ContainerLogContext.java | 71 ++
.../ResourceManagerAdministrationProtocol.java | 8 +
.../RefreshClusterMaxPriorityRequest.java | 35 +
.../RefreshClusterMaxPriorityResponse.java | 36 +
.../main/proto/applicationclient_protocol.proto | 1 +
...esourcemanager_administration_protocol.proto | 1 +
..._server_resourcemanager_service_protos.proto | 5 +
.../src/main/proto/yarn_protos.proto | 2 +
.../src/main/proto/yarn_service_protos.proto | 8 +
.../hadoop/yarn/client/api/YarnClient.java | 17 +
.../yarn/client/api/impl/YarnClientImpl.java | 11 +
.../hadoop/yarn/client/cli/ApplicationCLI.java | 29 +
.../hadoop/yarn/client/cli/RMAdminCLI.java | 15 +
.../hadoop/yarn/client/cli/TestRMAdminCLI.java | 9 +
.../hadoop/yarn/client/cli/TestYarnCLI.java | 29 +
.../ApplicationClientProtocolPBClientImpl.java | 20 +
.../ApplicationClientProtocolPBServiceImpl.java | 22 +
.../UpdateApplicationPriorityRequestPBImpl.java | 171 +++
...UpdateApplicationPriorityResponsePBImpl.java | 69 ++
.../impl/pb/LogAggregationContextPBImpl.java | 40 +
.../ContainerLogsRetentionPolicy.java | 29 -
.../nodelabels/CommonNodeLabelsManager.java | 2 +-
...nagerAdministrationProtocolPBClientImpl.java | 20 +
...agerAdministrationProtocolPBServiceImpl.java | 23 +
.../RefreshClusterMaxPriorityRequestPBImpl.java | 74 ++
...RefreshClusterMaxPriorityResponsePBImpl.java | 73 ++
.../src/main/resources/yarn-default.xml | 71 ++
.../hadoop-yarn/hadoop-yarn-registry/pom.xml | 4 +-
.../hadoop/yarn/server/webapp/AppBlock.java | 6 +-
.../yarn/server/nodemanager/NodeManager.java | 39 +-
.../nodemanager/NodeStatusUpdaterImpl.java | 259 +++--
.../application/ApplicationImpl.java | 5 +-
.../AMOnlyLogAggregationPolicy.java | 31 +
...AMOrFailedContainerLogAggregationPolicy.java | 35 +
.../AbstractContainerLogAggregationPolicy.java | 31 +
.../AllContainerLogAggregationPolicy.java | 30 +
.../logaggregation/AppLogAggregator.java | 5 +-
.../logaggregation/AppLogAggregatorImpl.java | 131 ++-
.../FailedContainerLogAggregationPolicy.java | 33 +
...edOrKilledContainerLogAggregationPolicy.java | 30 +
.../logaggregation/LogAggregationService.java | 19 +-
.../NoneContainerLogAggregationPolicy.java | 30 +
.../SampleContainerLogAggregationPolicy.java | 124 ++
.../event/LogHandlerAppStartedEvent.java | 15 +-
.../monitor/ContainersMonitorImpl.java | 10 +-
.../nodelabels/AbstractNodeLabelsProvider.java | 146 +++
.../ConfigurationNodeLabelsProvider.java | 81 ++
.../server/nodemanager/TestNodeManager.java | 50 +-
.../TestNodeStatusUpdaterForLabels.java | 76 +-
.../containermanager/TestAuxServices.java | 1 +
.../TestLogAggregationService.java | 677 +++++++++--
.../TestNonAggregatingLogHandler.java | 12 +-
.../TestConfigurationNodeLabelsProvider.java | 146 +++
.../server/resourcemanager/AdminService.java | 28 +
.../ApplicationMasterService.java | 3 +-
.../server/resourcemanager/ClientRMService.java | 76 ++
.../server/resourcemanager/RMAuditLogger.java | 2 +
.../resourcemanager/ResourceTrackerService.java | 2 +
.../resourcemanager/recovery/RMStateStore.java | 12 +-
.../recovery/RMStateUpdateAppEvent.java | 13 +
.../server/resourcemanager/rmapp/RMAppImpl.java | 3 +-
.../rmapp/attempt/RMAppAttemptImpl.java | 8 +-
.../server/resourcemanager/rmnode/RMNode.java | 7 +-
.../resourcemanager/rmnode/RMNodeImpl.java | 15 +-
.../scheduler/AbstractYarnScheduler.java | 25 +
.../scheduler/YarnScheduler.java | 20 +
.../scheduler/capacity/CapacityScheduler.java | 25 +-
.../ClientToAMTokenSecretManagerInRM.java | 7 +
.../yarn/server/resourcemanager/MockNodes.java | 4 +
.../resourcemanager/TestClientRMService.java | 75 ++
.../resourcemanager/TestRMAdminService.java | 34 +
.../TestWorkPreservingRMRestart.java | 2 +-
.../resourcetracker/TestNMReconnect.java | 39 +
.../attempt/TestRMAppAttemptTransitions.java | 32 +
.../capacity/TestContainerAllocation.java | 12 +-
371 files changed, 15142 insertions(+), 9357 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index 0000000,d46ab47..b99e3ba
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@@ -1,0 -1,777 +1,794 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs.client.impl;
+
+ import com.google.common.annotations.VisibleForTesting;
++import com.google.common.base.Preconditions;
+ import org.apache.hadoop.HadoopIllegalArgumentException;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+ import org.apache.hadoop.fs.Options.ChecksumOpt;
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
+ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+ import org.apache.hadoop.hdfs.util.ByteArrayManager;
+ import org.apache.hadoop.ipc.Client;
+ import org.apache.hadoop.util.DataChecksum;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_KEY;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.HedgedRead;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Mmap;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Read;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
+
+ import java.lang.Class;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+
+ /**
+ * DFSClient configuration.
+ */
+ public class DfsClientConf {
+ private static final Logger LOG = LoggerFactory.getLogger(DfsClientConf
+ .class);
+
+ private final int hdfsTimeout; // timeout value for a DFS operation.
+
+ private final int maxFailoverAttempts;
+ private final int maxRetryAttempts;
+ private final int failoverSleepBaseMillis;
+ private final int failoverSleepMaxMillis;
+ private final int maxBlockAcquireFailures;
+ private final int datanodeSocketWriteTimeout;
+ private final int ioBufferSize;
+ private final ChecksumOpt defaultChecksumOpt;
+ private final int writePacketSize;
+ private final int writeMaxPackets;
+ private final ByteArrayManager.Conf writeByteArrayManagerConf;
+ private final int socketTimeout;
+ private final long excludedNodesCacheExpiry;
+ /** Wait time window (in msec) if BlockMissingException is caught. */
+ private final int timeWindow;
+ private final int numCachedConnRetry;
+ private final int numBlockWriteRetry;
+ private final int numBlockWriteLocateFollowingRetry;
+ private final int blockWriteLocateFollowingInitialDelayMs;
+ private final long defaultBlockSize;
+ private final long prefetchSize;
+ private final short defaultReplication;
+ private final String taskId;
+ private final FsPermission uMask;
+ private final boolean connectToDnViaHostname;
+ private final int retryTimesForGetLastBlockLength;
+ private final int retryIntervalForGetLastBlockLength;
+ private final long datanodeRestartTimeout;
+ private final long slowIoWarningThresholdMs;
+
+ private final ShortCircuitConf shortCircuitConf;
+
+ private final long hedgedReadThresholdMillis;
+ private final int hedgedReadThreadpoolSize;
+ private final List<Class<? extends ReplicaAccessorBuilder>>
+ replicaAccessorBuilderClasses;
+
++ private final int stripedReadThreadpoolSize;
++
++
+ public DfsClientConf(Configuration conf) {
+ // The hdfsTimeout is currently the same as the ipc timeout
+ hdfsTimeout = Client.getTimeout(conf);
+
+ maxRetryAttempts = conf.getInt(
+ Retry.MAX_ATTEMPTS_KEY,
+ Retry.MAX_ATTEMPTS_DEFAULT);
+ timeWindow = conf.getInt(
+ Retry.WINDOW_BASE_KEY,
+ Retry.WINDOW_BASE_DEFAULT);
+ retryTimesForGetLastBlockLength = conf.getInt(
+ Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY,
+ Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
+ retryIntervalForGetLastBlockLength = conf.getInt(
+ Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY,
+ Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
+
+ maxFailoverAttempts = conf.getInt(
+ Failover.MAX_ATTEMPTS_KEY,
+ Failover.MAX_ATTEMPTS_DEFAULT);
+ failoverSleepBaseMillis = conf.getInt(
+ Failover.SLEEPTIME_BASE_KEY,
+ Failover.SLEEPTIME_BASE_DEFAULT);
+ failoverSleepMaxMillis = conf.getInt(
+ Failover.SLEEPTIME_MAX_KEY,
+ Failover.SLEEPTIME_MAX_DEFAULT);
+
+ maxBlockAcquireFailures = conf.getInt(
+ DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
+ DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
+ datanodeSocketWriteTimeout = conf.getInt(
+ DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
+ HdfsConstants.WRITE_TIMEOUT);
+ ioBufferSize = conf.getInt(
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+ defaultChecksumOpt = getChecksumOptFromConf(conf);
+ socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+ HdfsConstants.READ_TIMEOUT);
+ /** dfs.write.packet.size is an internal config variable */
+ writePacketSize = conf.getInt(
+ DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
+ DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+ writeMaxPackets = conf.getInt(
+ Write.MAX_PACKETS_IN_FLIGHT_KEY,
+ Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);
+
+ final boolean byteArrayManagerEnabled = conf.getBoolean(
+ Write.ByteArrayManager.ENABLED_KEY,
+ Write.ByteArrayManager.ENABLED_DEFAULT);
+ if (!byteArrayManagerEnabled) {
+ writeByteArrayManagerConf = null;
+ } else {
+ final int countThreshold = conf.getInt(
+ Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
+ Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
+ final int countLimit = conf.getInt(
+ Write.ByteArrayManager.COUNT_LIMIT_KEY,
+ Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
+ final long countResetTimePeriodMs = conf.getLong(
+ Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
+ Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
+ writeByteArrayManagerConf = new ByteArrayManager.Conf(
+ countThreshold, countLimit, countResetTimePeriodMs);
+ }
+
+ defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
+ DFS_BLOCK_SIZE_DEFAULT);
+ defaultReplication = (short) conf.getInt(
+ DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
+ taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
+ excludedNodesCacheExpiry = conf.getLong(
+ Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY,
+ Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
+ prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY,
+ 10 * defaultBlockSize);
+ numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
+ DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
+ numBlockWriteRetry = conf.getInt(
+ BlockWrite.RETRIES_KEY,
+ BlockWrite.RETRIES_DEFAULT);
+ numBlockWriteLocateFollowingRetry = conf.getInt(
+ BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
+ BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
+ blockWriteLocateFollowingInitialDelayMs = conf.getInt(
+ BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY,
+ BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT);
+ uMask = FsPermission.getUMask(conf);
+ connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
+ DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+
+ datanodeRestartTimeout = conf.getLong(
+ DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
+ DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
+ slowIoWarningThresholdMs = conf.getLong(
+ DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
+ DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
+
+ shortCircuitConf = new ShortCircuitConf(conf);
+
+ hedgedReadThresholdMillis = conf.getLong(
+ HedgedRead.THRESHOLD_MILLIS_KEY,
+ HedgedRead.THRESHOLD_MILLIS_DEFAULT);
+ hedgedReadThreadpoolSize = conf.getInt(
+ HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
+ HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
+
++ stripedReadThreadpoolSize = conf.getInt(
++ HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY,
++ HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT);
++ Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " +
++ HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY +
++ " must be greater than 0.");
+ replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf);
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<Class<? extends ReplicaAccessorBuilder>>
+ loadReplicaAccessorBuilderClasses(Configuration conf)
+ {
+ String classNames[] = conf.getTrimmedStrings(
+ HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY);
+ if (classNames.length == 0) {
+ return Collections.emptyList();
+ }
+ ArrayList<Class<? extends ReplicaAccessorBuilder>> classes =
+ new ArrayList<>();
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ for (String className: classNames) {
+ try {
+ Class<? extends ReplicaAccessorBuilder> cls =
+ (Class<? extends ReplicaAccessorBuilder>)
+ classLoader.loadClass(className);
+ classes.add(cls);
+ } catch (Throwable t) {
+ LOG.warn("Unable to load " + className, t);
+ }
+ }
+ return classes;
+ }
+
+ private DataChecksum.Type getChecksumType(Configuration conf) {
+ final String checksum = conf.get(
+ DFS_CHECKSUM_TYPE_KEY,
+ DFS_CHECKSUM_TYPE_DEFAULT);
+ try {
+ return DataChecksum.Type.valueOf(checksum);
+ } catch(IllegalArgumentException iae) {
+ LOG.warn("Bad checksum type: {}. Using default {}", checksum,
+ DFS_CHECKSUM_TYPE_DEFAULT);
+ return DataChecksum.Type.valueOf(
+ DFS_CHECKSUM_TYPE_DEFAULT);
+ }
+ }
+
+ // Construct a checksum option from conf
+ private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
+ DataChecksum.Type type = getChecksumType(conf);
+ int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
+ DFS_BYTES_PER_CHECKSUM_DEFAULT);
+ return new ChecksumOpt(type, bytesPerChecksum);
+ }
+
+ /** create a DataChecksum with the given option. */
+ public DataChecksum createChecksum(ChecksumOpt userOpt) {
+ // Fill in any missing field with the default.
+ ChecksumOpt opt = ChecksumOpt.processChecksumOpt(
+ defaultChecksumOpt, userOpt);
+ DataChecksum dataChecksum = DataChecksum.newDataChecksum(
+ opt.getChecksumType(),
+ opt.getBytesPerChecksum());
+ if (dataChecksum == null) {
+ throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
+ + userOpt + ", default=" + defaultChecksumOpt
+ + ", effective=null");
+ }
+ return dataChecksum;
+ }
+
+ @VisibleForTesting
+ public int getBlockWriteLocateFollowingInitialDelayMs() {
+ return blockWriteLocateFollowingInitialDelayMs;
+ }
+
+ /**
+ * @return the hdfsTimeout
+ */
+ public int getHdfsTimeout() {
+ return hdfsTimeout;
+ }
+
+ /**
+ * @return the maxFailoverAttempts
+ */
+ public int getMaxFailoverAttempts() {
+ return maxFailoverAttempts;
+ }
+
+ /**
+ * @return the maxRetryAttempts
+ */
+ public int getMaxRetryAttempts() {
+ return maxRetryAttempts;
+ }
+
+ /**
+ * @return the failoverSleepBaseMillis
+ */
+ public int getFailoverSleepBaseMillis() {
+ return failoverSleepBaseMillis;
+ }
+
+ /**
+ * @return the failoverSleepMaxMillis
+ */
+ public int getFailoverSleepMaxMillis() {
+ return failoverSleepMaxMillis;
+ }
+
+ /**
+ * @return the maxBlockAcquireFailures
+ */
+ public int getMaxBlockAcquireFailures() {
+ return maxBlockAcquireFailures;
+ }
+
+ /**
+ * @return the datanodeSocketWriteTimeout
+ */
+ public int getDatanodeSocketWriteTimeout() {
+ return datanodeSocketWriteTimeout;
+ }
+
+ /**
+ * @return the ioBufferSize
+ */
+ public int getIoBufferSize() {
+ return ioBufferSize;
+ }
+
+ /**
+ * @return the defaultChecksumOpt
+ */
+ public ChecksumOpt getDefaultChecksumOpt() {
+ return defaultChecksumOpt;
+ }
+
+ /**
+ * @return the writePacketSize
+ */
+ public int getWritePacketSize() {
+ return writePacketSize;
+ }
+
+ /**
+ * @return the writeMaxPackets
+ */
+ public int getWriteMaxPackets() {
+ return writeMaxPackets;
+ }
+
+ /**
+ * @return the writeByteArrayManagerConf
+ */
+ public ByteArrayManager.Conf getWriteByteArrayManagerConf() {
+ return writeByteArrayManagerConf;
+ }
+
+ /**
+ * @return the socketTimeout
+ */
+ public int getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ /**
+ * @return the excludedNodesCacheExpiry
+ */
+ public long getExcludedNodesCacheExpiry() {
+ return excludedNodesCacheExpiry;
+ }
+
+ /**
+ * @return the timeWindow
+ */
+ public int getTimeWindow() {
+ return timeWindow;
+ }
+
+ /**
+ * @return the numCachedConnRetry
+ */
+ public int getNumCachedConnRetry() {
+ return numCachedConnRetry;
+ }
+
+ /**
+ * @return the numBlockWriteRetry
+ */
+ public int getNumBlockWriteRetry() {
+ return numBlockWriteRetry;
+ }
+
+ /**
+ * @return the numBlockWriteLocateFollowingRetry
+ */
+ public int getNumBlockWriteLocateFollowingRetry() {
+ return numBlockWriteLocateFollowingRetry;
+ }
+
+ /**
+ * @return the defaultBlockSize
+ */
+ public long getDefaultBlockSize() {
+ return defaultBlockSize;
+ }
+
+ /**
+ * @return the prefetchSize
+ */
+ public long getPrefetchSize() {
+ return prefetchSize;
+ }
+
+ /**
+ * @return the defaultReplication
+ */
+ public short getDefaultReplication() {
+ return defaultReplication;
+ }
+
+ /**
+ * @return the taskId
+ */
+ public String getTaskId() {
+ return taskId;
+ }
+
+ /**
+ * @return the uMask
+ */
+ public FsPermission getUMask() {
+ return uMask;
+ }
+
+ /**
+ * @return the connectToDnViaHostname
+ */
+ public boolean isConnectToDnViaHostname() {
+ return connectToDnViaHostname;
+ }
+
+ /**
+ * @return the retryTimesForGetLastBlockLength
+ */
+ public int getRetryTimesForGetLastBlockLength() {
+ return retryTimesForGetLastBlockLength;
+ }
+
+ /**
+ * @return the retryIntervalForGetLastBlockLength
+ */
+ public int getRetryIntervalForGetLastBlockLength() {
+ return retryIntervalForGetLastBlockLength;
+ }
+
+ /**
+ * @return the datanodeRestartTimeout
+ */
+ public long getDatanodeRestartTimeout() {
+ return datanodeRestartTimeout;
+ }
+
+ /**
+ * @return the slowIoWarningThresholdMs
+ */
+ public long getSlowIoWarningThresholdMs() {
+ return slowIoWarningThresholdMs;
+ }
+
+ /**
+ * @return the hedgedReadThresholdMillis
+ */
+ public long getHedgedReadThresholdMillis() {
+ return hedgedReadThresholdMillis;
+ }
+
+ /**
+ * @return the hedgedReadThreadpoolSize
+ */
+ public int getHedgedReadThreadpoolSize() {
+ return hedgedReadThreadpoolSize;
+ }
+
+ /**
++ * @return the stripedReadThreadpoolSize
++ */
++ public int getStripedReadThreadpoolSize() {
++ return stripedReadThreadpoolSize;
++ }
++
++ /**
+ * @return the replicaAccessorBuilderClasses
+ */
+ public List<Class<? extends ReplicaAccessorBuilder>>
+ getReplicaAccessorBuilderClasses() {
+ return replicaAccessorBuilderClasses;
+ }
+
+ /**
+ * @return the shortCircuitConf
+ */
+ public ShortCircuitConf getShortCircuitConf() {
+ return shortCircuitConf;
+ }
+
+ /**
+ * Configuration for short-circuit reads.
+ */
+ public static class ShortCircuitConf {
+ private static final Logger LOG = DfsClientConf.LOG;
+
+ private final int socketCacheCapacity;
+ private final long socketCacheExpiry;
+
+ private final boolean useLegacyBlockReader;
+ private final boolean useLegacyBlockReaderLocal;
+ private final String domainSocketPath;
+ private final boolean skipShortCircuitChecksums;
+
+ private final int shortCircuitBufferSize;
+ private final boolean shortCircuitLocalReads;
+ private final boolean domainSocketDataTraffic;
+ private final int shortCircuitStreamsCacheSize;
+ private final long shortCircuitStreamsCacheExpiryMs;
+ private final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
+
+ private final boolean shortCircuitMmapEnabled;
+ private final int shortCircuitMmapCacheSize;
+ private final long shortCircuitMmapCacheExpiryMs;
+ private final long shortCircuitMmapCacheRetryTimeout;
+ private final long shortCircuitCacheStaleThresholdMs;
+
+ private final long keyProviderCacheExpiryMs;
+
+ public ShortCircuitConf(Configuration conf) {
+ socketCacheCapacity = conf.getInt(
+ DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
+ DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+ socketCacheExpiry = conf.getLong(
+ DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+ DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
+
+ useLegacyBlockReader = conf.getBoolean(
+ DFS_CLIENT_USE_LEGACY_BLOCKREADER,
+ DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
+ useLegacyBlockReaderLocal = conf.getBoolean(
+ DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
+ DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
+ shortCircuitLocalReads = conf.getBoolean(
+ Read.ShortCircuit.KEY,
+ Read.ShortCircuit.DEFAULT);
+ domainSocketDataTraffic = conf.getBoolean(
+ DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+ DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
+ domainSocketPath = conf.getTrimmed(
+ DFS_DOMAIN_SOCKET_PATH_KEY,
+ DFS_DOMAIN_SOCKET_PATH_DEFAULT);
+
+ LOG.debug(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
+ + " = {}", useLegacyBlockReaderLocal);
+ LOG.debug(Read.ShortCircuit.KEY
+ + " = {}", shortCircuitLocalReads);
+ LOG.debug(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
+ + " = {}", domainSocketDataTraffic);
+ LOG.debug(DFS_DOMAIN_SOCKET_PATH_KEY
+ + " = {}", domainSocketPath);
+
+ skipShortCircuitChecksums = conf.getBoolean(
+ Read.ShortCircuit.SKIP_CHECKSUM_KEY,
+ Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT);
+ shortCircuitBufferSize = conf.getInt(
+ Read.ShortCircuit.BUFFER_SIZE_KEY,
+ Read.ShortCircuit.BUFFER_SIZE_DEFAULT);
+ shortCircuitStreamsCacheSize = conf.getInt(
+ Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY,
+ Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT);
+ shortCircuitStreamsCacheExpiryMs = conf.getLong(
+ Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
+ Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT);
+ shortCircuitMmapEnabled = conf.getBoolean(
+ Mmap.ENABLED_KEY,
+ Mmap.ENABLED_DEFAULT);
+ shortCircuitMmapCacheSize = conf.getInt(
+ Mmap.CACHE_SIZE_KEY,
+ Mmap.CACHE_SIZE_DEFAULT);
+ shortCircuitMmapCacheExpiryMs = conf.getLong(
+ Mmap.CACHE_TIMEOUT_MS_KEY,
+ Mmap.CACHE_TIMEOUT_MS_DEFAULT);
+ shortCircuitMmapCacheRetryTimeout = conf.getLong(
+ Mmap.RETRY_TIMEOUT_MS_KEY,
+ Mmap.RETRY_TIMEOUT_MS_DEFAULT);
+ shortCircuitCacheStaleThresholdMs = conf.getLong(
+ ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY,
+ ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT);
+ shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
+ DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
+ DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
+
+ keyProviderCacheExpiryMs = conf.getLong(
+ DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
+ DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
+ }
+
+ /**
+ * @return the socketCacheCapacity
+ */
+ public int getSocketCacheCapacity() {
+ return socketCacheCapacity;
+ }
+
+ /**
+ * @return the socketCacheExpiry
+ */
+ public long getSocketCacheExpiry() {
+ return socketCacheExpiry;
+ }
+
+ public boolean isUseLegacyBlockReaderLocal() {
+ return useLegacyBlockReaderLocal;
+ }
+
+ public String getDomainSocketPath() {
+ return domainSocketPath;
+ }
+
+ public boolean isShortCircuitLocalReads() {
+ return shortCircuitLocalReads;
+ }
+
+ public boolean isDomainSocketDataTraffic() {
+ return domainSocketDataTraffic;
+ }
+ /**
+ * @return the useLegacyBlockReader
+ */
+ public boolean isUseLegacyBlockReader() {
+ return useLegacyBlockReader;
+ }
+
+ /**
+ * @return the skipShortCircuitChecksums
+ */
+ public boolean isSkipShortCircuitChecksums() {
+ return skipShortCircuitChecksums;
+ }
+
+ /**
+ * @return the shortCircuitBufferSize
+ */
+ public int getShortCircuitBufferSize() {
+ return shortCircuitBufferSize;
+ }
+
+ /**
+ * @return the shortCircuitStreamsCacheSize
+ */
+ public int getShortCircuitStreamsCacheSize() {
+ return shortCircuitStreamsCacheSize;
+ }
+
+ /**
+ * @return the shortCircuitStreamsCacheExpiryMs
+ */
+ public long getShortCircuitStreamsCacheExpiryMs() {
+ return shortCircuitStreamsCacheExpiryMs;
+ }
+
+ /**
+ * @return the shortCircuitSharedMemoryWatcherInterruptCheckMs
+ */
+ public int getShortCircuitSharedMemoryWatcherInterruptCheckMs() {
+ return shortCircuitSharedMemoryWatcherInterruptCheckMs;
+ }
+
+ /**
+ * @return the shortCircuitMmapEnabled
+ */
+ public boolean isShortCircuitMmapEnabled() {
+ return shortCircuitMmapEnabled;
+ }
+
+ /**
+ * @return the shortCircuitMmapCacheSize
+ */
+ public int getShortCircuitMmapCacheSize() {
+ return shortCircuitMmapCacheSize;
+ }
+
+ /**
+ * @return the shortCircuitMmapCacheExpiryMs
+ */
+ public long getShortCircuitMmapCacheExpiryMs() {
+ return shortCircuitMmapCacheExpiryMs;
+ }
+
+ /**
+ * @return the shortCircuitMmapCacheRetryTimeout
+ */
+ public long getShortCircuitMmapCacheRetryTimeout() {
+ return shortCircuitMmapCacheRetryTimeout;
+ }
+
+ /**
+ * @return the shortCircuitCacheStaleThresholdMs
+ */
+ public long getShortCircuitCacheStaleThresholdMs() {
+ return shortCircuitCacheStaleThresholdMs;
+ }
+
+ /**
+ * @return the keyProviderCacheExpiryMs
+ */
+ public long getKeyProviderCacheExpiryMs() {
+ return keyProviderCacheExpiryMs;
+ }
+
+ public String confAsString() {
+
+ return "shortCircuitStreamsCacheSize = "
+ + shortCircuitStreamsCacheSize
+ + ", shortCircuitStreamsCacheExpiryMs = "
+ + shortCircuitStreamsCacheExpiryMs
+ + ", shortCircuitMmapCacheSize = "
+ + shortCircuitMmapCacheSize
+ + ", shortCircuitMmapCacheExpiryMs = "
+ + shortCircuitMmapCacheExpiryMs
+ + ", shortCircuitMmapCacheRetryTimeout = "
+ + shortCircuitMmapCacheRetryTimeout
+ + ", shortCircuitCacheStaleThresholdMs = "
+ + shortCircuitCacheStaleThresholdMs
+ + ", socketCacheCapacity = "
+ + socketCacheCapacity
+ + ", socketCacheExpiry = "
+ + socketCacheExpiry
+ + ", shortCircuitLocalReads = "
+ + shortCircuitLocalReads
+ + ", useLegacyBlockReaderLocal = "
+ + useLegacyBlockReaderLocal
+ + ", domainSocketDataTraffic = "
+ + domainSocketDataTraffic
+ + ", shortCircuitSharedMemoryWatcherInterruptCheckMs = "
+ + shortCircuitSharedMemoryWatcherInterruptCheckMs
+ + ", keyProviderCacheExpiryMs = "
+ + keyProviderCacheExpiryMs;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index 3d19ab9,d5f4d53..7f45132
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@@ -78,17 -78,13 +78,24 @@@ public final class HdfsConstants
public static final String CLIENT_NAMENODE_PROTOCOL_NAME =
"org.apache.hadoop.hdfs.protocol.ClientProtocol";
+ /*
+ * These values correspond to the values used by the system default erasure
+ * coding policy.
+ * TODO: get these values from ec policy of the associated INodeFile
+ */
+
+ public static final byte NUM_DATA_BLOCKS = 6;
+ public static final byte NUM_PARITY_BLOCKS = 3;
+ // The chunk size for striped block which is used by erasure coding
+ public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024;
+
+ // Timeouts for communicating with DataNode for streaming writes/reads
+ public static final int READ_TIMEOUT = 60 * 1000;
+ public static final int READ_TIMEOUT_EXTENSION = 5 * 1000;
+ public static final int WRITE_TIMEOUT = 8 * 60 * 1000;
+ //for write pipeline
+ public static final int WRITE_TIMEOUT_EXTENSION = 5 * 1000;
+
// SafeMode actions
public enum SafeModeAction {
SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
index 0000000,e135d8e..f908dd3
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
@@@ -1,0 -1,120 +1,126 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs;
+
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.EnumSet;
+
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
++import org.apache.hadoop.util.DataChecksum;
+
+ /**
+ * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from
+ * replicas.
+ */
+ @InterfaceAudience.Private
+ public final class ExternalBlockReader implements BlockReader {
+ private final ReplicaAccessor accessor;
+ private final long visibleLength;
+ private long pos;
+
+ ExternalBlockReader(ReplicaAccessor accessor, long visibleLength,
+ long startOffset) {
+ this.accessor = accessor;
+ this.visibleLength = visibleLength;
+ this.pos = startOffset;
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws IOException {
+ int nread = accessor.read(pos, buf, off, len);
+ pos += nread;
+ return nread;
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ int nread = accessor.read(pos, buf);
+ pos += nread;
+ return nread;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ // You cannot skip backwards
+ if (n <= 0) {
+ return 0;
+ }
+ // You can't skip past the end of the replica.
+ long oldPos = pos;
+ pos += n;
+ if (pos > visibleLength) {
+ pos = visibleLength;
+ }
+ return pos - oldPos;
+ }
+
+ @Override
+ public int available() throws IOException {
+ // We return the amount of bytes that we haven't read yet from the
+ // replica, based on our current position. Some of the other block
+ // readers return a shorter length than that. The only advantage to
+ // returning a shorter length is that the DFSInputStream will
+ // trash your block reader and create a new one if someone tries to
+ // seek() beyond the available() region.
+ long diff = visibleLength - pos;
+ if (diff > Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ } else {
+ return (int)diff;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ accessor.close();
+ }
+
+ @Override
+ public void readFully(byte[] buf, int offset, int len) throws IOException {
+ BlockReaderUtil.readFully(this, buf, offset, len);
+ }
+
+ @Override
+ public int readAll(byte[] buf, int offset, int len) throws IOException {
+ return BlockReaderUtil.readAll(this, buf, offset, len);
+ }
+
+ @Override
+ public boolean isLocal() {
+ return accessor.isLocal();
+ }
+
+ @Override
+ public boolean isShortCircuit() {
+ return accessor.isShortCircuit();
+ }
+
+ @Override
+ public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+ // For now, pluggable ReplicaAccessors do not support zero-copy.
+ return null;
+ }
++
++ @Override
++ public DataChecksum getDataChecksum() {
++ return null;
++ }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index c083b5e,887accf..f292ee8
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@@ -445,17 -415,12 +431,17 @@@ public class PBHelper
}
public static BlockWithLocationsProto convert(BlockWithLocations blk) {
- return BlockWithLocationsProto.newBuilder()
- .setBlock(convert(blk.getBlock()))
+ BlockWithLocationsProto.Builder builder = BlockWithLocationsProto
+ .newBuilder().setBlock(convert(blk.getBlock()))
.addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
.addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
- .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()));
- .addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes()))
- .build();
++ .addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes()));
+ if (blk instanceof StripedBlockWithLocations) {
+ StripedBlockWithLocations sblk = (StripedBlockWithLocations) blk;
+ builder.setIndices(getByteString(sblk.getIndices()));
+ builder.setDataBlockNum(sblk.getDataBlockNum());
+ }
+ return builder.build();
}
public static BlockWithLocations convert(BlockWithLocationsProto b) {
@@@ -806,26 -697,17 +723,26 @@@
StorageType[] storageTypes = b.getStorageTypes();
if (storageTypes != null) {
- for (int i = 0; i < storageTypes.length; ++i) {
- builder.addStorageTypes(PBHelperClient.convertStorageType(storageTypes[i]));
+ for (StorageType storageType : storageTypes) {
- builder.addStorageTypes(PBHelper.convertStorageType(storageType));
++ builder.addStorageTypes(PBHelperClient.convertStorageType(storageType));
}
}
final String[] storageIDs = b.getStorageIDs();
if (storageIDs != null) {
builder.addAllStorageIDs(Arrays.asList(storageIDs));
}
+ if (b instanceof LocatedStripedBlock) {
+ LocatedStripedBlock sb = (LocatedStripedBlock) b;
+ int[] indices = sb.getBlockIndices();
+ Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens();
+ for (int i = 0; i < indices.length; i++) {
+ builder.addBlockIndex(indices[i]);
- builder.addBlockTokens(PBHelper.convert(blockTokens[i]));
++ builder.addBlockTokens(PBHelperClient.convert(blockTokens[i]));
+ }
+ }
- return builder.setB(PBHelper.convert(b.getBlock()))
- .setBlockToken(PBHelper.convert(b.getBlockToken()))
+ return builder.setB(PBHelperClient.convert(b.getBlock()))
+ .setBlockToken(PBHelperClient.convert(b.getBlockToken()))
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
}
@@@ -3144,191 -2880,4 +2954,192 @@@
setLeaseId(context.getLeaseId()).
build();
}
+
+ public static ECSchema convertECSchema(ECSchemaProto schema) {
+ List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList();
+ Map<String, String> options = new HashMap<>(optionsList.size());
+ for (ECSchemaOptionEntryProto option : optionsList) {
+ options.put(option.getKey(), option.getValue());
+ }
+ return new ECSchema(schema.getCodecName(), schema.getDataUnits(),
+ schema.getParityUnits(), options);
+ }
+
+ public static ECSchemaProto convertECSchema(ECSchema schema) {
+ ECSchemaProto.Builder builder = ECSchemaProto.newBuilder()
+ .setCodecName(schema.getCodecName())
+ .setDataUnits(schema.getNumDataUnits())
+ .setParityUnits(schema.getNumParityUnits());
+ Set<Entry<String, String>> entrySet = schema.getExtraOptions().entrySet();
+ for (Entry<String, String> entry : entrySet) {
+ builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
+ .setKey(entry.getKey()).setValue(entry.getValue()).build());
+ }
+ return builder.build();
+ }
+
+ public static ErasureCodingPolicy convertErasureCodingPolicy(
+ ErasureCodingPolicyProto policy) {
+ return new ErasureCodingPolicy(policy.getName(),
+ convertECSchema(policy.getSchema()),
+ policy.getCellSize());
+ }
+
+ public static ErasureCodingPolicyProto convertErasureCodingPolicy(
+ ErasureCodingPolicy policy) {
+ ErasureCodingPolicyProto.Builder builder = ErasureCodingPolicyProto
+ .newBuilder()
+ .setName(policy.getName())
+ .setSchema(convertECSchema(policy.getSchema()))
+ .setCellSize(policy.getCellSize());
+ return builder.build();
+ }
+
+ public static ErasureCodingZoneProto convertErasureCodingZone(
+ ErasureCodingZone ecZone) {
+ return ErasureCodingZoneProto.newBuilder().setDir(ecZone.getDir())
+ .setEcPolicy(convertErasureCodingPolicy(ecZone.getErasureCodingPolicy()))
+ .build();
+ }
+
+ public static ErasureCodingZone convertErasureCodingZone(
+ ErasureCodingZoneProto ecZoneProto) {
+ return new ErasureCodingZone(ecZoneProto.getDir(),
+ convertErasureCodingPolicy(ecZoneProto.getEcPolicy()));
+ }
+
+ public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
+ BlockECRecoveryInfoProto blockEcRecoveryInfoProto) {
+ ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock();
+ ExtendedBlock block = convert(blockProto);
+
+ DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto
+ .getSourceDnInfos();
+ DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto);
+
+ DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto
+ .getTargetDnInfos();
+ DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto);
+
+ StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto
+ .getTargetStorageUuids();
+ String[] targetStorageUuids = convert(targetStorageUuidsProto);
+
+ StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto
+ .getTargetStorageTypes();
+ StorageType[] convertStorageTypes = convertStorageTypes(
+ targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto
+ .getStorageTypesList().size());
+
+ List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto
+ .getLiveBlockIndicesList();
+ short[] liveBlkIndices = new short[liveBlockIndicesList.size()];
+ for (int i = 0; i < liveBlockIndicesList.size(); i++) {
+ liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue();
+ }
+
+ ErasureCodingPolicy ecPolicy =
+ convertErasureCodingPolicy(blockEcRecoveryInfoProto.getEcPolicy());
+
+ return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
+ targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy);
+ }
+
+ public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
+ BlockECRecoveryInfo blockEcRecoveryInfo) {
+ BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto
+ .newBuilder();
- builder.setBlock(convert(blockEcRecoveryInfo.getExtendedBlock()));
++ builder.setBlock(PBHelperClient.convert(
++ blockEcRecoveryInfo.getExtendedBlock()));
+
+ DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos();
+ builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
+
+ DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos();
+ builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
+
+ String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs();
+ builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs));
+
+ StorageType[] targetStorageTypes = blockEcRecoveryInfo
+ .getTargetStorageTypes();
+ builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
+
+ short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
+ builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
+
+ builder.setEcPolicy(convertErasureCodingPolicy(blockEcRecoveryInfo
+ .getErasureCodingPolicy()));
+
+ return builder.build();
+ }
+
+ private static List<Integer> convertIntArray(short[] liveBlockIndices) {
+ List<Integer> liveBlockIndicesList = new ArrayList<Integer>();
+ for (short s : liveBlockIndices) {
+ liveBlockIndicesList.add((int) s);
+ }
+ return liveBlockIndicesList;
+ }
+
+ private static StorageTypesProto convertStorageTypesProto(
+ StorageType[] targetStorageTypes) {
+ StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
+ for (StorageType storageType : targetStorageTypes) {
- builder.addStorageTypes(convertStorageType(storageType));
++ builder.addStorageTypes(PBHelperClient.convertStorageType(storageType));
+ }
+ return builder.build();
+ }
+
+ private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) {
+ StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder();
+ for (String storageUuid : targetStorageIDs) {
+ builder.addStorageUuids(storageUuid);
+ }
+ return builder.build();
+ }
+
+ private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) {
+ DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder();
+ for (DatanodeInfo datanodeInfo : dnInfos) {
- builder.addDatanodes(convert(datanodeInfo));
++ builder.addDatanodes(PBHelperClient.convert(datanodeInfo));
+ }
+ return builder.build();
+ }
+
+ private static String[] convert(StorageUuidsProto targetStorageUuidsProto) {
+ List<String> storageUuidsList = targetStorageUuidsProto
+ .getStorageUuidsList();
+ String[] storageUuids = new String[storageUuidsList.size()];
+ for (int i = 0; i < storageUuidsList.size(); i++) {
+ storageUuids[i] = storageUuidsList.get(i);
+ }
+ return storageUuids;
+ }
+
+ public static BlockECRecoveryCommandProto convert(
+ BlockECRecoveryCommand blkECRecoveryCmd) {
+ BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto
+ .newBuilder();
+ Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd
+ .getECTasks();
+ for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) {
+ builder
+ .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo));
+ }
+ return builder.build();
+ }
+
+ public static BlockECRecoveryCommand convert(
+ BlockECRecoveryCommandProto blkECRecoveryCmdProto) {
+ Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>();
+ List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto
+ .getBlockECRecoveryinfoList();
+ for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) {
+ blkECRecoveryInfos
+ .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto));
+ }
+ return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
+ blkECRecoveryInfos);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 555f506,be1a9ef..07c3c01
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@@ -17,8 -17,7 +17,8 @@@
*/
package org.apache.hadoop.hdfs.server.balancer;
- import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
+ import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@@ -64,10 -62,8 +63,9 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
- import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
@@@ -857,10 -794,14 +854,14 @@@ public class Dispatcher
if (shouldFetchMoreBlocks()) {
// fetch new blocks
try {
- blocksToReceive -= getBlockList();
+ final long received = getBlockList();
+ if (received == 0) {
+ return;
+ }
+ blocksToReceive -= received;
continue;
} catch (IOException e) {
- LOG.warn("Exception while getting block list", e);
+ LOG.warn("Exception while getting reportedBlock list", e);
return;
}
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
index 928424b,95d9983..2f214be
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
@@@ -55,12 -55,6 +55,12 @@@ public interface BlockCollection
public long getPreferredBlockSize();
/**
+ * Get block replication for the collection.
+ * @return block replication value. Return 0 if the file is erasure coded.
+ */
+ public short getPreferredBlockReplication();
+
- /**
++ /**
* @return the storage policy ID.
*/
public byte getStoragePolicyID();
@@@ -88,7 -81,7 +88,12 @@@
public boolean isUnderConstruction();
/**
+ * @return whether the block collection is in striping format
+ */
- public boolean isStriped();
++ boolean isStriped();
++
++ /**
+ * @return the id for the block collection
+ */
+ long getId();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index f440e14,706cbcd..dc296ac
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@@ -19,25 -19,39 +19,38 @@@ package org.apache.hadoop.hdfs.server.b
import java.io.IOException;
import java.util.LinkedList;
+ import java.util.List;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.LightWeightGSet;
+ import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID;
+
/**
- * BlockInfo class maintains for a given block
- * the {@link BlockCollection} it is part of and datanodes where the replicas of
- * the block are stored.
+ * For a given block (or an erasure coding block group), BlockInfo class
+ * maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes
+ * where the replicas of the block, or blocks belonging to the erasure coding
+ * block group, are stored.
*/
-@InterfaceAudience.Private
-public abstract class BlockInfo extends Block
+public abstract class BlockInfo extends Block
implements LightWeightGSet.LinkedElement {
+
public static final BlockInfo[] EMPTY_ARRAY = {};
- private BlockCollection bc;
+ /**
+ * Replication factor.
+ */
+ private short replication;
+
+ /**
+ * Block collection ID.
+ */
+ private long bcId;
- /** For implementing {@link LightWeightGSet.LinkedElement} interface */
+ /** For implementing {@link LightWeightGSet.LinkedElement} interface. */
private LightWeightGSet.LinkedElement nextLinkedElement;
/**
@@@ -58,26 -72,48 +71,40 @@@
/**
* Construct an entry for blocksmap
- * @param replication the block's replication factor
+ * @param size the block's replication factor, or the total number of blocks
+ * in the block group
*/
- public BlockInfo(short replication) {
- this.triplets = new Object[3*replication];
+ public BlockInfo(short size) {
+ this.triplets = new Object[3 * size];
- this.bc = null;
+ this.bcId = INVALID_INODE_ID;
- this.replication = replication;
++ this.replication = isStriped() ? 0 : size;
}
- public BlockInfo(Block blk, short replication) {
+ public BlockInfo(Block blk, short size) {
super(blk);
- this.triplets = new Object[3 * size];
- this.bc = null;
- this.triplets = new Object[3*replication];
++ this.triplets = new Object[3*size];
+ this.bcId = INVALID_INODE_ID;
- this.replication = replication;
- }
-
- /**
- * Copy construction.
- * @param from BlockInfo to copy from.
- */
- protected BlockInfo(BlockInfo from) {
- this(from, from.getReplication());
- this.bcId = from.bcId;
++ this.replication = isStriped() ? 0 : size;
+ }
+
+ public short getReplication() {
+ return replication;
}
- public BlockCollection getBlockCollection() {
- return bc;
+ public void setReplication(short repl) {
+ this.replication = repl;
}
- public void setBlockCollection(BlockCollection bc) {
- this.bc = bc;
+ public long getBlockCollectionId() {
+ return bcId;
+ }
+
+ public void setBlockCollectionId(long id) {
+ this.bcId = id;
+ }
+
+ public boolean isDeleted() {
+ return bcId == INVALID_INODE_ID;
}
public DatanodeDescriptor getDatanode(int index) {
@@@ -342,7 -363,7 +365,8 @@@
public void convertToBlockUnderConstruction(BlockUCState s,
DatanodeStorageInfo[] targets) {
if (isComplete()) {
- uc = new BlockUnderConstructionFeature(this, s, targets, this.isStriped());
- uc = new BlockUnderConstructionFeature(this, s, targets);
++ uc = new BlockUnderConstructionFeature(this, s, targets,
++ this.isStriped());
} else {
// the block is already under construction
uc.setBlockUCState(s);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index 12b4fd3,42934c3..b9d8486
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@@ -35,17 -36,14 +35,6 @@@ public class BlockInfoContiguous extend
}
/**
- * Copy construction. This is used to convert
- * BlockReplicationInfoUnderConstruction
- *
- * @param from BlockReplicationInfo to copy from.
- * Copy construction.
- * @param from BlockInfoContiguous to copy from.
-- */
-- protected BlockInfoContiguous(BlockInfoContiguous from) {
- this(from, (short) (from.triplets.length / 3));
- this.setBlockCollection(from.getBlockCollection());
- super(from);
-- }
--
-- /**
* Ensure that there is enough space to include num more triplets.
* @return first free triplet index.
*/