You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2013/11/07 19:02:38 UTC

svn commit: r1539740 - in /hadoop/common/trunk/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/

Author: brandonli
Date: Thu Nov  7 18:02:37 2013
New Revision: 1539740

URL: http://svn.apache.org/r1539740
Log:
HDFS-5252. Stable write is not handled correctly in someplace. Contributed by Brandon Li

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java?rev=1539740&r1=1539739&r2=1539740&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java Thu Nov  7 18:02:37 2013
@@ -109,6 +109,12 @@ public class Nfs3Utils {
    * Send a write response to the netty network socket channel
    */
   public static void writeChannel(Channel channel, XDR out, int xid) {
+    if (channel == null) {
+      RpcProgramNfs3.LOG
+          .info("Null channel should only happen in tests. Do nothing.");
+      return;
+    }
+    
     if (RpcProgramNfs3.LOG.isDebugEnabled()) {
       RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid);
     }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1539740&r1=1539739&r2=1539740&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Thu Nov  7 18:02:37 2013
@@ -1007,6 +1007,23 @@ class OpenFileCtx {
       }
       
       if (!writeCtx.getReplied()) {
+        if (stableHow != WriteStableHow.UNSTABLE) {
+          LOG.info("Do sync for stable write:" + writeCtx);
+          try {
+            if (stableHow == WriteStableHow.DATA_SYNC) {
+              fos.hsync();
+            } else {
+              Preconditions.checkState(stableHow == WriteStableHow.FILE_SYNC,
+                  "Unknown WriteStableHow:" + stableHow);
+              // Sync file data and length
+              fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+            }
+          } catch (IOException e) {
+            LOG.error("hsync failed with writeCtx:" + writeCtx + " error:" + e);
+            throw e;
+          }
+        }
+        
         WccAttr preOpAttr = latestAttr.getWccAttr();
         WccData fileWcc = new WccData(preOpAttr, latestAttr);
         if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1539740&r1=1539739&r2=1539740&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Thu Nov  7 18:02:37 2013
@@ -126,6 +126,8 @@ import org.jboss.netty.buffer.ChannelBuf
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * RPC program corresponding to nfs daemon. See {@link Nfs3}.
  */
@@ -1975,4 +1977,9 @@ public class RpcProgramNfs3 extends RpcP
     }
     return true;
   }
+  
+  @VisibleForTesting
+  WriteManager getWriteManager() {
+    return this.writeManager;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1539740&r1=1539739&r2=1539740&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Thu Nov  7 18:02:37 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.oncrpc.security
 import org.apache.hadoop.util.Daemon;
 import org.jboss.netty.channel.Channel;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 
 /**
@@ -262,6 +263,11 @@ public class WriteManager {
     }
     return attr;
   }
+   
+  @VisibleForTesting
+  ConcurrentMap<FileHandle, OpenFileCtx> getOpenFileMap() {
+    return this.openFileMap;
+  }
   
   /**
    * StreamMonitor wakes up periodically to find and closes idle streams.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1539740&r1=1539739&r2=1539740&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java Thu Nov  7 18:02:37 2013
@@ -17,21 +17,41 @@
  */
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 
 import junit.framework.Assert;
 
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
 import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.IdUserGroup;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
+import org.apache.hadoop.nfs.nfs3.request.CREATE3Request;
+import org.apache.hadoop.nfs.nfs3.request.READ3Request;
+import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.CREATE3Response;
+import org.apache.hadoop.nfs.nfs3.response.READ3Response;
+import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.SecurityHandler;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -105,7 +125,7 @@ public class TestWrites {
     Assert.assertTrue(limit - position == 1);
     Assert.assertTrue(appendedData.get(position) == (byte) 19);
   }
-  
+
   @Test
   // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
   // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
@@ -162,4 +182,117 @@ public class TestWrites {
     ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
     Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
   }
+
+  private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
+      throws InterruptedException {
+    int waitedTime = 0;
+    ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = nfsd.getWriteManager()
+        .getOpenFileMap();
+    OpenFileCtx ctx = openFileMap.get(handle);
+    assertTrue(ctx != null);
+    do {
+      Thread.sleep(3000);
+      waitedTime += 3000;
+      if (ctx.getPendingWritesForTest().size() == 0) {
+        return;
+      }
+    } while (waitedTime < maxWaitTime);
+
+    fail("Write can't finish.");
+  }
+
+  @Test
+  public void testWriteStableHow() throws IOException, InterruptedException {
+    HdfsConfiguration config = new HdfsConfiguration();
+    DFSClient client = null;
+    MiniDFSCluster cluster = null;
+    RpcProgramNfs3 nfsd;
+    SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class);
+    Mockito.when(securityHandler.getUser()).thenReturn(
+        System.getProperty("user.name"));
+
+    try {
+      cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+      cluster.waitActive();
+      client = new DFSClient(NameNode.getAddress(config), config);
+
+      // Start nfs
+      List<String> exports = new ArrayList<String>();
+      exports.add("/");
+      Nfs3 nfs3 = new Nfs3(exports, config);
+      nfs3.start(false);
+      nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
+
+      HdfsFileStatus status = client.getFileInfo("/");
+      FileHandle rootHandle = new FileHandle(status.getFileId());
+      // Create file1
+      CREATE3Request createReq = new CREATE3Request(rootHandle, "file1",
+          Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
+      XDR createXdr = new XDR();
+      createReq.serialize(createXdr);
+      CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
+          securityHandler, InetAddress.getLocalHost());
+      FileHandle handle = createRsp.getObjHandle();
+
+      // Test DATA_SYNC
+      byte[] buffer = new byte[10];
+      for (int i = 0; i < 10; i++) {
+        buffer[i] = (byte) i;
+      }
+      WRITE3Request writeReq = new WRITE3Request(handle, 0, 10,
+          WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer));
+      XDR writeXdr = new XDR();
+      writeReq.serialize(writeXdr);
+      nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
+          InetAddress.getLocalHost());
+
+      waitWrite(nfsd, handle, 60000);
+
+      // Readback
+      READ3Request readReq = new READ3Request(handle, 0, 10);
+      XDR readXdr = new XDR();
+      readReq.serialize(readXdr);
+      READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
+          securityHandler, InetAddress.getLocalHost());
+
+      assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
+
+      // Test FILE_SYNC
+
+      // Create file2
+      CREATE3Request createReq2 = new CREATE3Request(rootHandle, "file2",
+          Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
+      XDR createXdr2 = new XDR();
+      createReq2.serialize(createXdr2);
+      CREATE3Response createRsp2 = nfsd.create(createXdr2.asReadOnlyWrap(),
+          securityHandler, InetAddress.getLocalHost());
+      FileHandle handle2 = createRsp2.getObjHandle();
+
+      WRITE3Request writeReq2 = new WRITE3Request(handle2, 0, 10,
+          WriteStableHow.FILE_SYNC, ByteBuffer.wrap(buffer));
+      XDR writeXdr2 = new XDR();
+      writeReq2.serialize(writeXdr2);
+      nfsd.write(writeXdr2.asReadOnlyWrap(), null, 1, securityHandler,
+          InetAddress.getLocalHost());
+
+      waitWrite(nfsd, handle2, 60000);
+
+      // Readback
+      READ3Request readReq2 = new READ3Request(handle2, 0, 10);
+      XDR readXdr2 = new XDR();
+      readReq2.serialize(readXdr2);
+      READ3Response readRsp2 = nfsd.read(readXdr2.asReadOnlyWrap(),
+          securityHandler, InetAddress.getLocalHost());
+
+      assertTrue(Arrays.equals(buffer, readRsp2.getData().array()));
+      // FILE_SYNC should sync the file size
+      status = client.getFileInfo("/file2");
+      assertTrue(status.getLen() == 10);
+
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1539740&r1=1539739&r2=1539740&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov  7 18:02:37 2013
@@ -588,6 +588,8 @@ Release 2.2.1 - UNRELEASED
     HDFS-5458. Datanode failed volume threshold ignored if exception is thrown
     in getDataDirsFromURIs. (Mike Mellenthin via wang)
 
+    HDFS-5252. Stable write is not handled correctly in someplace. (brandonli)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES