You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/17 23:44:52 UTC

[05/34] git commit: HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol. (Arpit Agarwal)

HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol. (Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3b7d4715
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3b7d4715
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3b7d4715

Branch: refs/heads/branch-2
Commit: 3b7d4715a10b0215676e166d05a066a8357383e4
Parents: 183f4a4
Author: arp <ar...@apache.org>
Authored: Wed Aug 27 15:13:20 2014 -0700
Committer: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 13:42:00 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSOutputStream.java   | 13 ++++++++-----
 .../protocol/datatransfer/DataTransferProtocol.java    |  4 ++--
 .../hadoop/hdfs/protocol/datatransfer/Receiver.java    |  3 ++-
 .../hadoop/hdfs/protocol/datatransfer/Sender.java      |  6 ++++--
 .../apache/hadoop/hdfs/server/datanode/DataNode.java   |  3 ++-
 .../hadoop/hdfs/server/datanode/DataXceiver.java       |  6 ++++--
 .../hadoop-hdfs/src/main/proto/datatransfer.proto      |  7 +++++++
 .../apache/hadoop/hdfs/TestDataTransferProtocol.java   |  2 +-
 .../hadoop/hdfs/server/datanode/TestDiskError.java     |  2 +-
 9 files changed, 31 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b7d4715/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index fbe8171..2bcda8b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -360,6 +360,7 @@ public class DFSOutputStream extends FSOutputSummer
     private long restartDeadline = 0; // Deadline of DN restart
     private BlockConstructionStage stage;  // block construction stage
     private long bytesSent = 0; // number of bytes that've been sent
+    private final boolean isLazyPersistFile;
 
     /** Nodes have been used in the pipeline before and have failed. */
     private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
@@ -377,14 +378,15 @@ public class DFSOutputStream extends FSOutputSummer
      * Default construction for file create
      */
     private DataStreamer() {
-      this(null);
+      this(null, null);
     }
 
     /**
      * construction with tracing info
      */
-    private DataStreamer(Span span) {
+    private DataStreamer(HdfsFileStatus stat, Span span) {
       isAppend = false;
+      isLazyPersistFile = stat.isLazyPersist();
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
       traceSpan = span;
     }
@@ -404,6 +406,7 @@ public class DFSOutputStream extends FSOutputSummer
       block = lastBlock.getBlock();
       bytesSent = block.getNumBytes();
       accessToken = lastBlock.getBlockToken();
+      isLazyPersistFile = stat.isLazyPersist();
       long usedInLastBlock = stat.getLen() % blockSize;
       int freeInLastBlock = (int)(blockSize - usedInLastBlock);
 
@@ -1396,7 +1399,7 @@ public class DFSOutputStream extends FSOutputSummer
           new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
               dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
               nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
-              cachingStrategy.get());
+              cachingStrategy.get(), isLazyPersistFile);
   
           // receive ack for connect
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1649,7 +1652,7 @@ public class DFSOutputStream extends FSOutputSummer
     if (Trace.isTracing()) {
       traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
     }
-    streamer = new DataStreamer(traceSpan);
+    streamer = new DataStreamer(stat, traceSpan);
     if (favoredNodes != null && favoredNodes.length != 0) {
       streamer.setFavoredNodes(favoredNodes);
     }
@@ -1726,7 +1729,7 @@ public class DFSOutputStream extends FSOutputSummer
     } else {
       computePacketChunkSize(dfsClient.getConf().writePacketSize,
           checksum.getBytesPerChecksum());
-      streamer = new DataStreamer(traceSpan);
+      streamer = new DataStreamer(stat, traceSpan);
     }
     this.fileEncryptionInfo = stat.getFileEncryptionInfo();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b7d4715/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index d54d5be..f6b99e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -106,8 +106,8 @@ public interface DataTransferProtocol {
       final long maxBytesRcvd,
       final long latestGenerationStamp,
       final DataChecksum requestedChecksum,
-      final CachingStrategy cachingStrategy) throws IOException;
-
+      final CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist) throws IOException;
   /**
    * Transfer a block to another datanode.
    * The block stage must be

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b7d4715/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index daae9b7..538c82d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -148,7 +148,8 @@ public abstract class Receiver implements DataTransferProtocol {
           fromProto(proto.getRequestedChecksum()),
           (proto.hasCachingStrategy() ?
               getCachingStrategy(proto.getCachingStrategy()) :
-            CachingStrategy.newDefaultStrategy()));
+            CachingStrategy.newDefaultStrategy()),
+            (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
      } finally {
       if (traceScope != null) traceScope.close();
      }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b7d4715/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index fb6cf2c..1ae9da5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -128,7 +128,8 @@ public class Sender implements DataTransferProtocol {
       final long maxBytesRcvd,
       final long latestGenerationStamp,
       DataChecksum requestedChecksum,
-      final CachingStrategy cachingStrategy) throws IOException {
+      final CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist) throws IOException {
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
         blk, clientName, blockToken);
     
@@ -146,7 +147,8 @@ public class Sender implements DataTransferProtocol {
       .setMaxBytesRcvd(maxBytesRcvd)
       .setLatestGenerationStamp(latestGenerationStamp)
       .setRequestedChecksum(checksumProto)
-      .setCachingStrategy(getCachingStrategy(cachingStrategy));
+      .setCachingStrategy(getCachingStrategy(cachingStrategy))
+      .setAllowLazyPersist(allowLazyPersist);
     
     if (source != null) {
       proto.setSource(PBHelper.convertDatanodeInfo(source));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b7d4715/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 8337102..a4db56a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2010,7 +2010,8 @@ public class DataNode extends ReconfigurableBase
 
         new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
             clientname, targets, targetStorageTypes, srcNode,
-            stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
+            stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
+            false);
 
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, null);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b7d4715/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 4575c93..3b8304e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -544,7 +544,8 @@ class DataXceiver extends Receiver implements Runnable {
       final long maxBytesRcvd,
       final long latestGenerationStamp,
       DataChecksum requestedChecksum,
-      CachingStrategy cachingStrategy) throws IOException {
+      CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist) throws IOException {
     previousOpClientName = clientname;
     updateCurrentThreadName("Receiving block " + block);
     final boolean isDatanode = clientname.length() == 0;
@@ -648,10 +649,11 @@ class DataXceiver extends Receiver implements Runnable {
               HdfsConstants.SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(unbufMirrorIn);
 
+          // Do not propagate allowLazyPersist to downstream DataNodes.
           new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
               blockToken, clientname, targets, targetStorageTypes, srcDataNode,
               stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
-              latestGenerationStamp, requestedChecksum, cachingStrategy);
+              latestGenerationStamp, requestedChecksum, cachingStrategy, false);
 
           mirrorOut.flush();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b7d4715/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 098d10a..fb774b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -115,6 +115,13 @@ message OpWriteBlockProto {
   optional CachingStrategyProto cachingStrategy = 10;
   optional StorageTypeProto storageType = 11 [default = DISK];
   repeated StorageTypeProto targetStorageTypes = 12;
+
+  /**
+   * Hint to the DataNode that the block can be allocated on transient
+   * storage i.e. memory and written to disk lazily. The DataNode is free
+   * to ignore this hint.
+   */
+  optional bool allowLazyPersist = 13 [default = false];
 }
   
 message OpTransferBlockProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b7d4715/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 494dd1d..6405b5a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -530,6 +530,6 @@ public class TestDataTransferProtocol {
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], new StorageType[1], null, stage,
         0, block.getNumBytes(), block.getNumBytes(), newGS,
-        checksum, CachingStrategy.newDefaultStrategy());
+        checksum, CachingStrategy.newDefaultStrategy(), false);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b7d4715/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index 4b5b6e1..f440bb6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -152,7 +152,7 @@ public class TestDiskError {
         BlockTokenSecretManager.DUMMY_TOKEN, "",
         new DatanodeInfo[0], new StorageType[0], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
-        checksum, CachingStrategy.newDefaultStrategy());
+        checksum, CachingStrategy.newDefaultStrategy(), false);
     out.flush();
 
     // close the connection before sending the content of the block