You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2014/08/28 00:23:49 UTC
[22/22] git commit: HDFS-6923. Propagate LazyPersist flag to DNs via
DataTransferProtocol. (Arpit Agarwal)
HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol. (Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8612590d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8612590d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8612590d
Branch: refs/heads/HDFS-6581
Commit: 8612590d896d4debd9f0cb83a6d99832512fbe00
Parents: 43ccda5
Author: arp <ar...@apache.org>
Authored: Wed Aug 27 15:13:20 2014 -0700
Committer: arp <ar...@apache.org>
Committed: Wed Aug 27 15:23:02 2014 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt | 3 +++
.../java/org/apache/hadoop/hdfs/DFSOutputStream.java | 11 +++++++----
.../hdfs/protocol/datatransfer/DataTransferProtocol.java | 4 ++--
.../hadoop/hdfs/protocol/datatransfer/Receiver.java | 3 ++-
.../apache/hadoop/hdfs/protocol/datatransfer/Sender.java | 6 ++++--
.../org/apache/hadoop/hdfs/server/datanode/DataNode.java | 3 ++-
.../apache/hadoop/hdfs/server/datanode/DataXceiver.java | 6 ++++--
.../hadoop-hdfs/src/main/proto/datatransfer.proto | 7 +++++++
.../org/apache/hadoop/hdfs/TestDataTransferProtocol.java | 2 +-
.../hadoop/hdfs/server/datanode/TestDiskError.java | 2 +-
10 files changed, 33 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8612590d/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
index 1f2bf64..8854e07 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
@@ -6,4 +6,7 @@
HDFS-6922. Add LazyPersist flag to INodeFile, save it in FsImage and
edit logs. (Arpit Agarwal)
+
+ HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol.
+ (Arpit Agarwal)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8612590d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 14977a2..c255bf6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -344,6 +344,7 @@ public class DFSOutputStream extends FSOutputSummer
private long restartDeadline = 0; // Deadline of DN restart
private BlockConstructionStage stage; // block construction stage
private long bytesSent = 0; // number of bytes that've been sent
+ private final boolean isLazyPersistFile;
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
@@ -358,8 +359,9 @@ public class DFSOutputStream extends FSOutputSummer
/**
* Default construction for file create
*/
- private DataStreamer() {
+ private DataStreamer(HdfsFileStatus stat) {
isAppend = false;
+ isLazyPersistFile = stat.isLazyPersist();
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
@@ -377,6 +379,7 @@ public class DFSOutputStream extends FSOutputSummer
block = lastBlock.getBlock();
bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken();
+ isLazyPersistFile = stat.isLazyPersist();
long usedInLastBlock = stat.getLen() % blockSize;
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
@@ -1352,7 +1355,7 @@ public class DFSOutputStream extends FSOutputSummer
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
- cachingStrategy.get());
+ cachingStrategy.get(), isLazyPersistFile);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1601,7 +1604,7 @@ public class DFSOutputStream extends FSOutputSummer
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
- streamer = new DataStreamer();
+ streamer = new DataStreamer(stat);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
@@ -1650,7 +1653,7 @@ public class DFSOutputStream extends FSOutputSummer
} else {
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
- streamer = new DataStreamer();
+ streamer = new DataStreamer(stat);
}
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8612590d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index d54d5be..f6b99e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -106,8 +106,8 @@ public interface DataTransferProtocol {
final long maxBytesRcvd,
final long latestGenerationStamp,
final DataChecksum requestedChecksum,
- final CachingStrategy cachingStrategy) throws IOException;
-
+ final CachingStrategy cachingStrategy,
+ final boolean allowLazyPersist) throws IOException;
/**
* Transfer a block to another datanode.
* The block stage must be
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8612590d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index a09437c..78693bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -137,7 +137,8 @@ public abstract class Receiver implements DataTransferProtocol {
fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
- CachingStrategy.newDefaultStrategy()));
+ CachingStrategy.newDefaultStrategy()),
+ (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
}
/** Receive {@link Op#TRANSFER_BLOCK} */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8612590d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 68da523..4298bb0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -124,7 +124,8 @@ public class Sender implements DataTransferProtocol {
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
- final CachingStrategy cachingStrategy) throws IOException {
+ final CachingStrategy cachingStrategy,
+ final boolean allowLazyPersist) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);
@@ -142,7 +143,8 @@ public class Sender implements DataTransferProtocol {
.setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto)
- .setCachingStrategy(getCachingStrategy(cachingStrategy));
+ .setCachingStrategy(getCachingStrategy(cachingStrategy))
+ .setAllowLazyPersist(allowLazyPersist);
if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8612590d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 1ec91d0..e86ea0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1809,7 +1809,8 @@ public class DataNode extends Configured
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
clientname, targets, targetStorageTypes, srcNode,
- stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
+ stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
+ false);
// send data & checksum
blockSender.sendBlock(out, unbufOut, null);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8612590d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 4575c93..3b8304e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -544,7 +544,8 @@ class DataXceiver extends Receiver implements Runnable {
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
- CachingStrategy cachingStrategy) throws IOException {
+ CachingStrategy cachingStrategy,
+ final boolean allowLazyPersist) throws IOException {
previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0;
@@ -648,10 +649,11 @@ class DataXceiver extends Receiver implements Runnable {
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
+ // Do not propagate allowLazyPersist to downstream DataNodes.
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
- latestGenerationStamp, requestedChecksum, cachingStrategy);
+ latestGenerationStamp, requestedChecksum, cachingStrategy, false);
mirrorOut.flush();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8612590d/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 6283b56..13747ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -109,6 +109,13 @@ message OpWriteBlockProto {
optional CachingStrategyProto cachingStrategy = 10;
optional StorageTypeProto storageType = 11 [default = DISK];
repeated StorageTypeProto targetStorageTypes = 12;
+
+ /**
+ * Hint to the DataNode that the block can be allocated on transient
+ * storage i.e. memory and written to disk lazily. The DataNode is free
+ * to ignore this hint.
+ */
+ optional bool allowLazyPersist = 13 [default = false];
}
message OpTransferBlockProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8612590d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index bcb68e9..3586551 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -524,6 +524,6 @@ public class TestDataTransferProtocol {
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], new StorageType[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS,
- checksum, CachingStrategy.newDefaultStrategy());
+ checksum, CachingStrategy.newDefaultStrategy(), false);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8612590d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index 4b5b6e1..f440bb6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -152,7 +152,7 @@ public class TestDiskError {
BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], new StorageType[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
- checksum, CachingStrategy.newDefaultStrategy());
+ checksum, CachingStrategy.newDefaultStrategy(), false);
out.flush();
// close the connection before sending the content of the block