You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2013/11/07 19:02:38 UTC
svn commit: r1539740 - in /hadoop/common/trunk/hadoop-hdfs-project:
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/
hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/
Author: brandonli
Date: Thu Nov 7 18:02:37 2013
New Revision: 1539740
URL: http://svn.apache.org/r1539740
Log:
HDFS-5252. Stable write is not handled correctly in someplace. Contributed by Brandon Li
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java?rev=1539740&r1=1539739&r2=1539740&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java Thu Nov 7 18:02:37 2013
@@ -109,6 +109,12 @@ public class Nfs3Utils {
* Send a write response to the netty network socket channel
*/
public static void writeChannel(Channel channel, XDR out, int xid) {
+ if (channel == null) {
+ RpcProgramNfs3.LOG
+ .info("Null channel should only happen in tests. Do nothing.");
+ return;
+ }
+
if (RpcProgramNfs3.LOG.isDebugEnabled()) {
RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1539740&r1=1539739&r2=1539740&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Thu Nov 7 18:02:37 2013
@@ -1007,6 +1007,23 @@ class OpenFileCtx {
}
if (!writeCtx.getReplied()) {
+ if (stableHow != WriteStableHow.UNSTABLE) {
+ LOG.info("Do sync for stable write:" + writeCtx);
+ try {
+ if (stableHow == WriteStableHow.DATA_SYNC) {
+ fos.hsync();
+ } else {
+ Preconditions.checkState(stableHow == WriteStableHow.FILE_SYNC,
+ "Unknown WriteStableHow:" + stableHow);
+ // Sync file data and length
+ fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+ }
+ } catch (IOException e) {
+ LOG.error("hsync failed with writeCtx:" + writeCtx + " error:" + e);
+ throw e;
+ }
+ }
+
WccAttr preOpAttr = latestAttr.getWccAttr();
WccData fileWcc = new WccData(preOpAttr, latestAttr);
if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1539740&r1=1539739&r2=1539740&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Thu Nov 7 18:02:37 2013
@@ -126,6 +126,8 @@ import org.jboss.netty.buffer.ChannelBuf
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* RPC program corresponding to nfs daemon. See {@link Nfs3}.
*/
@@ -1975,4 +1977,9 @@ public class RpcProgramNfs3 extends RpcP
}
return true;
}
+
+ @VisibleForTesting
+ WriteManager getWriteManager() {
+ return this.writeManager;
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1539740&r1=1539739&r2=1539740&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Thu Nov 7 18:02:37 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.oncrpc.security
import org.apache.hadoop.util.Daemon;
import org.jboss.netty.channel.Channel;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
/**
@@ -262,6 +263,11 @@ public class WriteManager {
}
return attr;
}
+
+ @VisibleForTesting
+ ConcurrentMap<FileHandle, OpenFileCtx> getOpenFileMap() {
+ return this.openFileMap;
+ }
/**
* StreamMonitor wakes up periodically to find and closes idle streams.
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1539740&r1=1539739&r2=1539740&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java Thu Nov 7 18:02:37 2013
@@ -17,21 +17,41 @@
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.IOException;
+import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import junit.framework.Assert;
import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
+import org.apache.hadoop.nfs.nfs3.request.CREATE3Request;
+import org.apache.hadoop.nfs.nfs3.request.READ3Request;
+import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.CREATE3Response;
+import org.apache.hadoop.nfs.nfs3.response.READ3Response;
+import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.SecurityHandler;
import org.junit.Test;
import org.mockito.Mockito;
@@ -105,7 +125,7 @@ public class TestWrites {
Assert.assertTrue(limit - position == 1);
Assert.assertTrue(appendedData.get(position) == (byte) 19);
}
-
+
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
@@ -162,4 +182,117 @@ public class TestWrites {
ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
+
+ private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
+ throws InterruptedException {
+ int waitedTime = 0;
+ ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = nfsd.getWriteManager()
+ .getOpenFileMap();
+ OpenFileCtx ctx = openFileMap.get(handle);
+ assertTrue(ctx != null);
+ do {
+ Thread.sleep(3000);
+ waitedTime += 3000;
+ if (ctx.getPendingWritesForTest().size() == 0) {
+ return;
+ }
+ } while (waitedTime < maxWaitTime);
+
+ fail("Write can't finish.");
+ }
+
+ @Test
+ public void testWriteStableHow() throws IOException, InterruptedException {
+ HdfsConfiguration config = new HdfsConfiguration();
+ DFSClient client = null;
+ MiniDFSCluster cluster = null;
+ RpcProgramNfs3 nfsd;
+ SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class);
+ Mockito.when(securityHandler.getUser()).thenReturn(
+ System.getProperty("user.name"));
+
+ try {
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+ cluster.waitActive();
+ client = new DFSClient(NameNode.getAddress(config), config);
+
+ // Start nfs
+ List<String> exports = new ArrayList<String>();
+ exports.add("/");
+ Nfs3 nfs3 = new Nfs3(exports, config);
+ nfs3.start(false);
+ nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
+
+ HdfsFileStatus status = client.getFileInfo("/");
+ FileHandle rootHandle = new FileHandle(status.getFileId());
+ // Create file1
+ CREATE3Request createReq = new CREATE3Request(rootHandle, "file1",
+ Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
+ XDR createXdr = new XDR();
+ createReq.serialize(createXdr);
+ CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
+ securityHandler, InetAddress.getLocalHost());
+ FileHandle handle = createRsp.getObjHandle();
+
+ // Test DATA_SYNC
+ byte[] buffer = new byte[10];
+ for (int i = 0; i < 10; i++) {
+ buffer[i] = (byte) i;
+ }
+ WRITE3Request writeReq = new WRITE3Request(handle, 0, 10,
+ WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer));
+ XDR writeXdr = new XDR();
+ writeReq.serialize(writeXdr);
+ nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
+ InetAddress.getLocalHost());
+
+ waitWrite(nfsd, handle, 60000);
+
+ // Readback
+ READ3Request readReq = new READ3Request(handle, 0, 10);
+ XDR readXdr = new XDR();
+ readReq.serialize(readXdr);
+ READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
+ securityHandler, InetAddress.getLocalHost());
+
+ assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
+
+ // Test FILE_SYNC
+
+ // Create file2
+ CREATE3Request createReq2 = new CREATE3Request(rootHandle, "file2",
+ Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
+ XDR createXdr2 = new XDR();
+ createReq2.serialize(createXdr2);
+ CREATE3Response createRsp2 = nfsd.create(createXdr2.asReadOnlyWrap(),
+ securityHandler, InetAddress.getLocalHost());
+ FileHandle handle2 = createRsp2.getObjHandle();
+
+ WRITE3Request writeReq2 = new WRITE3Request(handle2, 0, 10,
+ WriteStableHow.FILE_SYNC, ByteBuffer.wrap(buffer));
+ XDR writeXdr2 = new XDR();
+ writeReq2.serialize(writeXdr2);
+ nfsd.write(writeXdr2.asReadOnlyWrap(), null, 1, securityHandler,
+ InetAddress.getLocalHost());
+
+ waitWrite(nfsd, handle2, 60000);
+
+ // Readback
+ READ3Request readReq2 = new READ3Request(handle2, 0, 10);
+ XDR readXdr2 = new XDR();
+ readReq2.serialize(readXdr2);
+ READ3Response readRsp2 = nfsd.read(readXdr2.asReadOnlyWrap(),
+ securityHandler, InetAddress.getLocalHost());
+
+ assertTrue(Arrays.equals(buffer, readRsp2.getData().array()));
+ // FILE_SYNC should sync the file size
+ status = client.getFileInfo("/file2");
+ assertTrue(status.getLen() == 10);
+
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1539740&r1=1539739&r2=1539740&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov 7 18:02:37 2013
@@ -588,6 +588,8 @@ Release 2.2.1 - UNRELEASED
HDFS-5458. Datanode failed volume threshold ignored if exception is thrown
in getDataDirsFromURIs. (Mike Mellenthin via wang)
+ HDFS-5252. Stable write is not handled correctly in someplace. (brandonli)
+
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES