You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2013/10/07 05:39:44 UTC
svn commit: r1529735 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project:
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/
hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/
Author: brandonli
Date: Mon Oct 7 03:39:43 2013
New Revision: 1529735
URL: http://svn.apache.org/r1529735
Log:
HDFS-5259. Merging change r1529730 from trunk
Added:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
- copied unchanged from r1529730, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1529735&r1=1529734&r2=1529735&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Mon Oct 7 03:39:43 2013
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.security.InvalidParameterException;
import java.util.EnumSet;
@@ -55,6 +56,7 @@ import org.apache.hadoop.oncrpc.security
import org.apache.hadoop.util.Daemon;
import org.jboss.netty.channel.Channel;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -362,6 +364,30 @@ class OpenFileCtx {
}
}
+ @VisibleForTesting
+ public static void alterWriteRequest(WRITE3Request request, long cachedOffset) {
+ long offset = request.getOffset();
+ int count = request.getCount();
+ long smallerCount = offset + count - cachedOffset;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
+ + " current offset %d," + " drop the overlapped section (%d-%d)"
+ + " and append new data (%d-%d).", offset, (offset + count - 1),
+ cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+ + count - 1)));
+ }
+
+ ByteBuffer data = request.getData();
+ Preconditions.checkState(data.position() == 0,
+ "The write request data has non-zero position");
+ data.position((int) (cachedOffset - offset));
+ Preconditions.checkState(data.limit() - data.position() == smallerCount,
+ "The write request buffer has wrong limit/position regarding count");
+
+ request.setOffset(cachedOffset);
+ request.setCount((int) smallerCount);
+ }
+
/**
* Creates and adds a WriteCtx into the pendingWrites map. This is a
* synchronized method to handle concurrent writes.
@@ -374,12 +400,40 @@ class OpenFileCtx {
long offset = request.getOffset();
int count = request.getCount();
long cachedOffset = nextOffset.get();
-
+ int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT;
+
if (LOG.isDebugEnabled()) {
LOG.debug("requesed offset=" + offset + " and current offset="
+ cachedOffset);
}
+ // Handle a special case first
+ if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
+ // One Linux client behavior: after a file is closed and reopened to
+ // write, the client sometimes combines previous written data(could still
+ // be in kernel buffer) with newly appended data in one write. This is
+ // usually the first write after file reopened. In this
+ // case, we log the event and drop the overlapped section.
+ LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
+ + " current offset %d," + " drop the overlapped section (%d-%d)"
+ + " and append new data (%d-%d).", offset, (offset + count - 1),
+ cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+ + count - 1)));
+
+ if (!pendingWrites.isEmpty()) {
+ LOG.warn("There are other pending writes, fail this jumbo write");
+ return null;
+ }
+
+ LOG.warn("Modify this write to write only the appended data");
+ alterWriteRequest(request, cachedOffset);
+
+ // Update local variable
+ originalCount = count;
+ offset = request.getOffset();
+ count = request.getCount();
+ }
+
// Fail non-append call
if (offset < cachedOffset) {
LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
@@ -389,8 +443,9 @@ class OpenFileCtx {
DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
: WriteCtx.DataState.ALLOW_DUMP;
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
- request.getOffset(), request.getCount(), request.getStableHow(),
- request.getData().array(), channel, xid, false, dataState);
+ request.getOffset(), request.getCount(), originalCount,
+ request.getStableHow(), request.getData(), channel, xid, false,
+ dataState);
if (LOG.isDebugEnabled()) {
LOG.debug("Add new write to the list with nextOffset " + cachedOffset
+ " and requesed offset=" + offset);
@@ -421,8 +476,7 @@ class OpenFileCtx {
WRITE3Response response;
long cachedOffset = nextOffset.get();
if (offset + count > cachedOffset) {
- LOG.warn("Haven't noticed any partial overwrite for a sequential file"
- + " write requests. Treat it as a real random write, no support.");
+ LOG.warn("Treat this jumbo write as a real random write, no support.");
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
} else {
@@ -641,6 +695,7 @@ class OpenFileCtx {
private void addWrite(WriteCtx writeCtx) {
long offset = writeCtx.getOffset();
int count = writeCtx.getCount();
+ // For the offset range (min, max), min is inclusive, and max is exclusive
pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
}
@@ -753,19 +808,7 @@ class OpenFileCtx {
long offset = writeCtx.getOffset();
int count = writeCtx.getCount();
WriteStableHow stableHow = writeCtx.getStableHow();
- byte[] data = null;
- try {
- data = writeCtx.getData();
- } catch (Exception e1) {
- LOG.error("Failed to get request data offset:" + offset + " count:"
- + count + " error:" + e1);
- // Cleanup everything
- cleanup();
- return;
- }
- Preconditions.checkState(data.length == count);
-
FileHandle handle = writeCtx.getHandle();
if (LOG.isDebugEnabled()) {
LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
@@ -774,8 +817,8 @@ class OpenFileCtx {
try {
// The write is not protected by lock. asyncState is used to make sure
- // there is one thread doing write back at any time
- fos.write(data, 0, count);
+ // there is one thread doing write back at any time
+ writeCtx.writeData(fos);
long flushedOffset = getFlushedOffset();
if (flushedOffset != (offset + count)) {
@@ -784,10 +827,6 @@ class OpenFileCtx {
+ (offset + count));
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("After writing " + handle.getFileId() + " at offset "
- + offset + ", update the memory count.");
- }
// Reduce memory occupation size if request was allowed dumped
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
@@ -795,6 +834,11 @@ class OpenFileCtx {
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
updateNonSequentialWriteInMemory(-count);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("After writing " + handle.getFileId() + " at offset "
+ + offset + ", updated the memory count, new value:"
+ + nonSequentialWriteInMemory.get());
+ }
}
}
}
@@ -802,6 +846,11 @@ class OpenFileCtx {
if (!writeCtx.getReplied()) {
WccAttr preOpAttr = latestAttr.getWccAttr();
WccData fileWcc = new WccData(preOpAttr, latestAttr);
+ if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
+ LOG.warn("Return original count:" + writeCtx.getOriginalCount()
+ + " instead of real data count:" + count);
+ count = writeCtx.getOriginalCount();
+ }
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
@@ -809,7 +858,7 @@ class OpenFileCtx {
}
} catch (IOException e) {
LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
- + offset + " and length " + data.length, e);
+ + offset + " and length " + count, e);
if (!writeCtx.getReplied()) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java?rev=1529735&r1=1529734&r2=1529735&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java Mon Oct 7 03:39:43 2013
@@ -20,13 +20,16 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.jboss.netty.channel.Channel;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -50,8 +53,17 @@ class WriteCtx {
private final FileHandle handle;
private final long offset;
private final int count;
+
+ //Only needed for overlapped write, referring OpenFileCtx.addWritesToCache()
+ private final int originalCount;
+ public static final int INVALID_ORIGINAL_COUNT = -1;
+
+ public int getOriginalCount() {
+ return originalCount;
+ }
+
private final WriteStableHow stableHow;
- private volatile byte[] data;
+ private volatile ByteBuffer data;
private final Channel channel;
private final int xid;
@@ -89,9 +101,13 @@ class WriteCtx {
}
return 0;
}
+
+ // Resized write should not allow dump
+ Preconditions.checkState(originalCount == INVALID_ORIGINAL_COUNT);
+
this.raf = raf;
dumpFileOffset = dumpOut.getChannel().position();
- dumpOut.write(data, 0, count);
+ dumpOut.write(data.array(), 0, count);
if (LOG.isDebugEnabled()) {
LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
}
@@ -127,7 +143,8 @@ class WriteCtx {
return stableHow;
}
- byte[] getData() throws IOException {
+ @VisibleForTesting
+ ByteBuffer getData() throws IOException {
if (dataState != DataState.DUMPED) {
synchronized (this) {
if (dataState != DataState.DUMPED) {
@@ -143,15 +160,45 @@ class WriteCtx {
private void loadData() throws IOException {
Preconditions.checkState(data == null);
- data = new byte[count];
+ byte[] rawData = new byte[count];
raf.seek(dumpFileOffset);
- int size = raf.read(data, 0, count);
+ int size = raf.read(rawData, 0, count);
if (size != count) {
throw new IOException("Data count is " + count + ", but read back "
+ size + "bytes");
}
+ data = ByteBuffer.wrap(rawData);
}
+ public void writeData(HdfsDataOutputStream fos) throws IOException {
+ Preconditions.checkState(fos != null);
+
+ ByteBuffer dataBuffer = null;
+ try {
+ dataBuffer = getData();
+ } catch (Exception e1) {
+ LOG.error("Failed to get request data offset:" + offset + " count:"
+ + count + " error:" + e1);
+ throw new IOException("Can't get WriteCtx.data");
+ }
+
+ byte[] data = dataBuffer.array();
+ int position = dataBuffer.position();
+ int limit = dataBuffer.limit();
+ Preconditions.checkState(limit - position == count);
+ // Modified write has a valid original count
+ if (position != 0) {
+ if (limit != getOriginalCount()) {
+ throw new IOException("Modified write has differnt original size."
+ + "buff position:" + position + " buff limit:" + limit + ". "
+ + toString());
+ }
+ }
+
+ // Now write data
+ fos.write(data, position, count);
+ }
+
Channel getChannel() {
return channel;
}
@@ -168,11 +215,13 @@ class WriteCtx {
this.replied = replied;
}
- WriteCtx(FileHandle handle, long offset, int count, WriteStableHow stableHow,
- byte[] data, Channel channel, int xid, boolean replied, DataState dataState) {
+ WriteCtx(FileHandle handle, long offset, int count, int originalCount,
+ WriteStableHow stableHow, ByteBuffer data, Channel channel, int xid,
+ boolean replied, DataState dataState) {
this.handle = handle;
this.offset = offset;
this.count = count;
+ this.originalCount = originalCount;
this.stableHow = stableHow;
this.data = data;
this.channel = channel;
@@ -185,7 +234,7 @@ class WriteCtx {
@Override
public String toString() {
return "Id:" + handle.getFileId() + " offset:" + offset + " count:" + count
- + " stableHow:" + stableHow + " replied:" + replied + " dataState:"
- + dataState + " xid:" + xid;
+ + " originalCount:" + originalCount + " stableHow:" + stableHow
+ + " replied:" + replied + " dataState:" + dataState + " xid:" + xid;
}
}
\ No newline at end of file
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1529735&r1=1529734&r2=1529735&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Oct 7 03:39:43 2013
@@ -162,6 +162,9 @@ Release 2.1.2 - UNRELEASED
HDFS-5299. DFS client hangs in updatePipeline RPC when failover happened.
(Vinay via jing9)
+ HDFS-5259. Support client which combines appended data with old data
+ before sends it to NFS server. (brandonli)
+
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES