You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/30 17:41:30 UTC

[30/58] [abbrv] hadoop git commit: HDFS-9092. Nfs silently drops overlapping write requests and causes data copying to fail. Contributed by Yongjun Zhang.

HDFS-9092. Nfs silently drops overlapping write requests and causes data copying to fail. Contributed by Yongjun Zhang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/151fca50
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/151fca50
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/151fca50

Branch: refs/heads/HDFS-7285
Commit: 151fca5032719e561226ef278e002739073c23ec
Parents: 5c3b663
Author: Yongjun Zhang <yz...@cloudera.com>
Authored: Mon Sep 28 18:45:00 2015 -0700
Committer: Yongjun Zhang <yz...@cloudera.com>
Committed: Mon Sep 28 18:45:00 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/nfs/nfs3/OffsetRange.java       |   4 +
 .../hadoop/hdfs/nfs/nfs3/OpenFileCtx.java       | 141 +++++++++++--------
 .../apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java   |  82 ++++++++++-
 .../apache/hadoop/hdfs/nfs/nfs3/TestWrites.java |  92 +++++++++++-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 5 files changed, 260 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/151fca50/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
index f02dcc0..764524a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
@@ -70,4 +70,8 @@ public class OffsetRange {
     }
     return false;
   }
+
+  public String toString() {
+    return "[" + getMin() + ", " + getMax() + ")";
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151fca50/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
index 9610f48..9371a72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
@@ -490,11 +490,11 @@ class OpenFileCtx {
     int count = request.getCount();
     long smallerCount = offset + count - cachedOffset;
     if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
-          + " current offset %d," + " drop the overlapped section (%d-%d)"
-          + " and append new data (%d-%d).", offset, (offset + count - 1),
-          cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
-              + count - 1)));
+      LOG.debug(String.format("Got overwrite with appended data [%d-%d),"
+          + " current offset %d," + " drop the overlapped section [%d-%d)"
+          + " and append new data [%d-%d).", offset, (offset + count),
+          cachedOffset, offset, cachedOffset, cachedOffset, (offset
+              + count)));
     }
     
     ByteBuffer data = request.getData();
@@ -508,6 +508,22 @@ class OpenFileCtx {
     request.setCount((int) smallerCount);
   }
   
+  @VisibleForTesting
+  private static void trimWriteRequest(WriteCtx writeCtx,
+      long currentOffset) {
+    long offset = writeCtx.getOffset();
+    if (LOG.isDebugEnabled()) {
+      int count = writeCtx.getCount();
+      LOG.debug(String.format("Trim request [%d-%d),"
+          + " current offset %d," + " drop the overlapped section [%d-%d)"
+          + " and write new data [%d-%d)",
+          offset, (offset + count),
+          currentOffset, offset, (currentOffset),
+          currentOffset, (offset + count)));
+    }
+    writeCtx.trimWrite((int)(currentOffset - offset));
+  }
+
   /**
    * Creates and adds a WriteCtx into the pendingWrites map. This is a
    * synchronized method to handle concurrent writes.
@@ -527,23 +543,27 @@ class OpenFileCtx {
           + cachedOffset);
     }
 
-    // Handle a special case first
+    // Ignore write request with range below the current offset
+    if (offset + count <= cachedOffset) {
+      LOG.warn(String.format("Got overwrite [%d-%d) smaller than"
+          + " current offset %d," + " drop the request.",
+          offset, (offset + count), cachedOffset));
+      return null;
+    }
+
+    // Handle a special case: trim request whose offset is smaller than
+    // the current offset
     if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
       // One Linux client behavior: after a file is closed and reopened to
       // write, the client sometimes combines previous written data(could still
       // be in kernel buffer) with newly appended data in one write. This is
       // usually the first write after file reopened. In this
       // case, we log the event and drop the overlapped section.
-      LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
-          + " current offset %d," + " drop the overlapped section (%d-%d)"
-          + " and append new data (%d-%d).", offset, (offset + count - 1),
-          cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
-              + count - 1)));
-
-      if (!pendingWrites.isEmpty()) {
-        LOG.warn("There are other pending writes, fail this jumbo write");
-        return null;
-      }
+      LOG.warn(String.format("Got overwrite with appended data [%d-%d),"
+          + " current offset %d," + " drop the overlapped section [%d-%d)"
+          + " and append new data [%d-%d).", offset, (offset + count),
+          cachedOffset, offset, cachedOffset, cachedOffset, (offset
+              + count)));
       
       LOG.warn("Modify this write to write only the appended data");
       alterWriteRequest(request, cachedOffset);
@@ -1002,45 +1022,56 @@ class OpenFileCtx {
       this.asyncStatus = false;
       return null;
     } 
-    
-      Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
-      OffsetRange range = lastEntry.getKey();
-      WriteCtx toWrite = lastEntry.getValue();
-      
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
-            + nextOffset);
+
+    Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
+    OffsetRange range = lastEntry.getKey();
+    WriteCtx toWrite = lastEntry.getValue();
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
+          + nextOffset);
+    }
+
+    long offset = nextOffset.get();
+    if (range.getMin() > offset) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("The next sequential write has not arrived yet");
       }
-      
-      long offset = nextOffset.get();
-      if (range.getMin() > offset) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("The next sequential write has not arrived yet");
-        }
-        processCommits(nextOffset.get()); // handle race
-        this.asyncStatus = false;
-      } else if (range.getMin() < offset && range.getMax() > offset) {
-        // shouldn't happen since we do sync for overlapped concurrent writers
-        LOG.warn("Got an overlapping write (" + range.getMin() + ", "
-            + range.getMax() + "), nextOffset=" + offset
-            + ". Silently drop it now");
-        pendingWrites.remove(range);
-        processCommits(nextOffset.get()); // handle race
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
-              + ") from the list");
-        }
-        // after writing, remove the WriteCtx from cache 
-        pendingWrites.remove(range);
-        // update nextOffset
-        nextOffset.addAndGet(toWrite.getCount());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Change nextOffset to " + nextOffset.get());
-        }
-        return toWrite;
+      processCommits(nextOffset.get()); // handle race
+      this.asyncStatus = false;
+    } else if (range.getMax() <= offset) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Remove write " + range.toString()
+            + " which is already written from the list");
       }
-    
+      // remove the WriteCtx from cache
+      pendingWrites.remove(range);
+    } else if (range.getMin() < offset && range.getMax() > offset) {
+      LOG.warn("Got an overlapping write " + range.toString()
+          + ", nextOffset=" + offset
+          + ". Remove and trim it");
+      pendingWrites.remove(range);
+      trimWriteRequest(toWrite, offset);
+      // update nextOffset
+      nextOffset.addAndGet(toWrite.getCount());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Change nextOffset (after trim) to " + nextOffset.get());
+      }
+      return toWrite;
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Remove write " + range.toString()
+            + " from the list");
+      }
+      // after writing, remove the WriteCtx from cache
+      pendingWrites.remove(range);
+      // update nextOffset
+      nextOffset.addAndGet(toWrite.getCount());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Change nextOffset to " + nextOffset.get());
+      }
+      return toWrite;
+    }
     return null;
   }
   
@@ -1272,8 +1303,8 @@ class OpenFileCtx {
     WccAttr preOpAttr = latestAttr.getWccAttr();
     while (!pendingWrites.isEmpty()) {
       OffsetRange key = pendingWrites.firstKey();
-      LOG.info("Fail pending write: (" + key.getMin() + ", " + key.getMax()
-          + "), nextOffset=" + nextOffset.get());
+      LOG.info("Fail pending write: " + key.toString()
+          + ", nextOffset=" + nextOffset.get());
       
       WriteCtx writeCtx = pendingWrites.remove(key);
       if (!writeCtx.getReplied()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151fca50/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
index 82c826f..8c2c7ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
@@ -51,8 +51,8 @@ class WriteCtx {
   }
 
   private final FileHandle handle;
-  private final long offset;
-  private final int count;
+  private long offset;
+  private int count;
   
   /**
    * Some clients can send a write that includes previously written data along
@@ -61,13 +61,61 @@ class WriteCtx {
    * request before it was modified to write only the new data. 
    * @see OpenFileCtx#addWritesToCache for more details
    */
-  private final int originalCount; 
+  private int originalCount;
   public static final int INVALID_ORIGINAL_COUNT = -1;
   
+  /**
+   * Overlapping Write Request Handling
+   * A write request can be in three states:
+   *   s0. just created, with data != null
+   *   s1. dumped as length "count", and data set to null
+   *   s2. read back from dumped area as length "count"
+   *
+   * Write requests may have overlapping range, we detect this by comparing
+   * the data offset range of the request against the current offset of data
+   * already written to HDFS. There are two categories:
+   *
+   * 1. If the beginning part of a new write request data is already written
+   * due to an earlier request, we alter the new request by trimming this
+   * portion before the new request enters state s0, and the originalCount is
+   * remembered.
+   *
+   * 2. If the lower end of the write request range is beyond the current
+   * offset of data already written, we put the request into cache, and detect
+   * the overlapping when taking the request out from cache.
+   *
+   * For category 2, if we find out that a write request overlap with another,
+   * this write request is already in state s0, s1, or s3. We trim the
+   * beginning part of this request, by remembering the size of this portion
+   * as trimDelta. So the resulted offset of the write request is
+   * "offset + trimDelta" and the resulted size of the write request is
+   * "count - trimDelta".
+   *
+   * What important to notice is, if the request is in s1 when we do the
+   * trimming, the data dumped is of size "count", so when we load
+   * the data back from dumped area, we should set the position of the data
+   * buffer to trimDelta.
+   */
+  private int trimDelta;
+
   public int getOriginalCount() {
     return originalCount;
   }
 
+  public void trimWrite(int delta) {
+    Preconditions.checkState(delta < count);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trim write request by delta:" + delta + " " + toString());
+    }
+    synchronized(this) {
+      trimDelta = delta;
+      if (originalCount == INVALID_ORIGINAL_COUNT) {
+        originalCount = count;
+      }
+      trimData();
+    }
+  }
+
   private final WriteStableHow stableHow;
   private volatile ByteBuffer data;
   
@@ -139,11 +187,17 @@ class WriteCtx {
   }
   
   long getOffset() {
-    return offset;
+    synchronized(this) {
+      // See comment "Overlapping Write Request Handling" above
+      return offset + trimDelta;
+    }
   }
 
   int getCount() {
-    return count;
+    synchronized(this) {
+      // See comment "Overlapping Write Request Handling" above
+      return count - trimDelta;
+    }
   }
 
   WriteStableHow getStableHow() {
@@ -174,7 +228,22 @@ class WriteCtx {
       throw new IOException("Data count is " + count + ", but read back "
           + size + "bytes");
     }
-    data = ByteBuffer.wrap(rawData);
+    synchronized(this) {
+      data = ByteBuffer.wrap(rawData);
+      trimData();
+    }
+  }
+
+  private void trimData() {
+    if (data != null && trimDelta > 0) {
+      // make it not dump-able since the data  will be used
+      // shortly
+      dataState = DataState.NO_DUMP;
+      data.position(data.position() + trimDelta);
+      offset += trimDelta;
+      count -= trimDelta;
+      trimDelta = 0;
+    }
   }
 
   public void writeData(HdfsDataOutputStream fos) throws IOException {
@@ -229,6 +298,7 @@ class WriteCtx {
     this.offset = offset;
     this.count = count;
     this.originalCount = originalCount;
+    this.trimDelta = 0;
     this.stableHow = stableHow;
     this.data = data;
     this.channel = channel;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151fca50/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
index 3c193ae..9c327c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
@@ -640,7 +640,97 @@ public class TestWrites {
       }
     }
   }
-  
+
+  @Test
+  public void testOverlappingWrites() throws IOException, InterruptedException {
+    NfsConfiguration config = new NfsConfiguration();
+    MiniDFSCluster cluster = null;
+    RpcProgramNfs3 nfsd;
+    final int bufSize = 32;
+    SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class);
+    Mockito.when(securityHandler.getUser()).thenReturn(
+        System.getProperty("user.name"));
+    String currentUser = System.getProperty("user.name");
+    config.set(
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserGroupConfKey(currentUser),
+        "*");
+    config.set(
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserIpConfKey(currentUser),
+        "*");
+    ProxyUsers.refreshSuperUserGroupsConfiguration(config);
+    // Use emphral port in case tests are running in parallel
+    config.setInt("nfs3.mountd.port", 0);
+    config.setInt("nfs3.server.port", 0);
+
+    try {
+      cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+      cluster.waitActive();
+
+      Nfs3 nfs3 = new Nfs3(config);
+      nfs3.startServiceInternal(false);
+      nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
+
+      DFSClient dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config),
+          config);
+      HdfsFileStatus status = dfsClient.getFileInfo("/");
+      FileHandle rootHandle = new FileHandle(status.getFileId());
+
+      CREATE3Request createReq = new CREATE3Request(rootHandle,
+          "overlapping-writes" + System.currentTimeMillis(),
+          Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
+      XDR createXdr = new XDR();
+      createReq.serialize(createXdr);
+      CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
+          securityHandler, new InetSocketAddress("localhost", 1234));
+      FileHandle handle = createRsp.getObjHandle();
+      byte[] buffer = new byte[bufSize];
+      for (int i = 0; i < bufSize; i++) {
+        buffer[i] = (byte) i;
+      }
+      int[][] ranges = new int[][] {
+          {0, 10},
+          {5, 7},
+          {5, 5},
+          {10, 6},
+          {18, 6},
+          {20, 6},
+          {28, 4},
+          {16, 2},
+          {25, 4}
+      };
+      for (int i = 0; i < ranges.length; i++) {
+        int x[] = ranges[i];
+        byte[] tbuffer = new byte[x[1]];
+        for (int j = 0; j < x[1]; j++) {
+          tbuffer[j] = buffer[x[0] + j];
+        }
+        WRITE3Request writeReq = new WRITE3Request(handle, (long)x[0], x[1],
+            WriteStableHow.UNSTABLE, ByteBuffer.wrap(tbuffer));
+        XDR writeXdr = new XDR();
+        writeReq.serialize(writeXdr);
+        nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
+            new InetSocketAddress("localhost", 1234));
+      }
+
+      waitWrite(nfsd, handle, 60000);
+      READ3Request readReq = new READ3Request(handle, 0, bufSize);
+      XDR readXdr = new XDR();
+      readReq.serialize(readXdr);
+      READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
+          securityHandler, new InetSocketAddress("localhost", config.getInt(
+              NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
+              NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT)));
+
+      assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   @Test
   public void testCheckSequential() throws IOException {
     DFSClient dfsClient = Mockito.mock(DFSClient.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151fca50/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3daf8d4..d55beae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1447,6 +1447,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9147. Fix the setting of visibleLength in ExternalBlockReader. (Colin
     P. McCabe via Lei (Eddy) Xu)
 
+    HDFS-9092. Nfs silently drops overlapping write requests and causes data
+    copying to fail. (Yongjun Zhang)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES