You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2017/05/12 08:58:19 UTC

hadoop git commit: HDFS-11674. reserveSpaceForReplicas is not released if append request failed due to mirror down and replica recovered (Contributed by Vinayakumar B)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 28b947603 -> b0b6de38c


HDFS-11674. reserveSpaceForReplicas is not released if append request failed due to mirror down and replica recovered (Contributed by Vinayakumar B)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b0b6de38
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b0b6de38
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b0b6de38

Branch: refs/heads/branch-2.7
Commit: b0b6de38c97dd7e4241af3c450fc925bf3f7659d
Parents: 28b9476
Author: Vinayakumar B <vi...@apache.org>
Authored: Fri May 12 14:26:28 2017 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Fri May 12 14:26:28 2017 +0530

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 22 +++++++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  4 ++
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  5 ++
 .../fsdataset/impl/TestRbwSpaceReservation.java | 67 +++++++++++++++++++-
 5 files changed, 100 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0b6de38/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 5835751..da64e6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -278,6 +278,9 @@ Release 2.7.4 - UNRELEASED
     HDFS-11795. Fix ASF License warnings in branch-2.7.
     (Yiqun Lin via aajisaka)
 
+	HDFS-11674. reserveSpaceForReplicas is not released if append request failed
+	due to mirror down and replica recovered (vinayakumarb)
+
 Release 2.7.3 - 2016-08-25
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0b6de38/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index a44de61..1272e29 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -426,6 +426,21 @@ public class DFSOutputStream extends FSOutputSummer
       }
     }
 
+    /**
+     * Set pipeline in construction
+     *
+     * @param lastBlock the last block of a file
+     * @throws IOException
+     */
+    void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{
+      // setup pipeline to append to the last block XXX retries??
+      setPipeline(lastBlock);
+      if (nodes.length < 1) {
+        throw new IOException("Unable to retrieve blocks locations " +
+            " for last block " + block + " of file " + src);
+      }
+    }
+
     private void setPipeline(LocatedBlock lb) {
       setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
     }
@@ -2497,6 +2512,13 @@ public class DFSOutputStream extends FSOutputSummer
     return fileId;
   }
 
+  /**
+   * Returns the data streamer object.
+   */
+  protected DataStreamer getStreamer() {
+    return streamer;
+  }
+
   private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
     System.arraycopy(srcs, 0, dsts, 0, skipIndex);
     System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0b6de38/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index ec86337..c13b6f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -2359,6 +2359,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       LOG.info("initReplicaRecovery: changing replica state for "
           + block + " from " + replica.getState()
           + " to " + rur.getState());
+      if (replica.getState() == ReplicaState.TEMPORARY || replica
+          .getState() == ReplicaState.RBW) {
+        ((ReplicaInPipeline) replica).releaseAllBytesReserved();
+      }
     }
     return rur.createInfo();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0b6de38/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index d0dba9f..974903b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1355,6 +1355,11 @@ public class DFSTestUtil {
     out.abort();
   }
 
+  public static void setPipeline(DFSOutputStream out, LocatedBlock lastBlock)
+      throws IOException {
+    out.getStreamer().setPipelineInConstruction(lastBlock);
+  }
+
   public static byte[] asArray(ByteBuffer buf) {
     byte arr[] = new byte[buf.remaining()];
     buf.duplicate().get(arr);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0b6de38/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
index 026cbd8..24c2e06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
@@ -30,10 +30,13 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.HdfsBlockLocation;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -43,6 +46,7 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -76,9 +80,12 @@ public class TestRbwSpaceReservation {
   private DataNodeFaultInjector old = null;
   private static Random rand = new Random();
 
-  private void initConfig(int blockSize) {
+  @Before
+  public void before() {
     conf = new HdfsConfiguration();
+  }
 
+  private void initConfig(int blockSize) {
     // Refresh disk usage information frequently.
     conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
     conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
@@ -523,4 +530,62 @@ public class TestRbwSpaceReservation {
       }
     }
   }
+
+  @Test(timeout = 60000)
+  public void testReservedSpaceForLeaseRecovery() throws Exception {
+    final short replication = 3;
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
+        1000);
+    startCluster(BLOCK_SIZE, replication, -1);
+
+    final String methodName = GenericTestUtils.getMethodName();
+    final Path file = new Path("/" + methodName + ".01.dat");
+    // Write to the file and kill the writer.
+    FSDataOutputStream os = fs.create(file, replication);
+    os.write(new byte[8192]);
+    os.hflush();
+    os.close();
+    /*
+     * Reset the pipeline for the append in such a way that, datanode which is
+     * down is one of the mirror, not the first datanode.
+     */
+    HdfsBlockLocation blockLocation = (HdfsBlockLocation) fs.getClient()
+        .getBlockLocations(file.toString(), 0, BLOCK_SIZE)[0];
+    LocatedBlock lastBlock = blockLocation.getLocatedBlock();
+    // stop 3rd node.
+    cluster.stopDataNode(lastBlock.getLocations()[2].getName());
+    try {
+      os = fs.append(file);
+      DFSTestUtil.setPipeline((DFSOutputStream) os.getWrappedStream(),
+          lastBlock);
+      os.writeBytes("hi");
+      os.hsync();
+    } catch (IOException e) {
+      // Append will fail due to not able to replace datanodes in 3 nodes
+      // cluster.
+      LOG.info("", e);
+    }
+    DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
+    /*
+     * There is a chance that stopped DN could be chosen as primary for
+     * recovery. If so, then recovery will not happen in time. So mark stopped
+     * node as dead to exclude that node.
+     */
+    cluster.setDataNodeDead(lastBlock.getLocations()[2]);
+    fs.recoverLease(file);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          return fs.isFileClosed(file);
+        } catch (IOException e) {
+          return false;
+        }
+      }
+    }, 500, 30000);
+    checkReservedSpace(0);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org