You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/07/31 21:54:18 UTC
svn commit: r226687 - in
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs:
ClientProtocol.java FSNamesystem.java NDFSClient.java NameNode.java
Author: mc
Date: Sun Jul 31 12:54:15 2005
New Revision: 226687
URL: http://svn.apache.org/viewcvs?rev=226687&view=rev
Log:
Fix two problems:
1) Revamp the NDFSClientInputStream to be less complicated.
We can thus easily add a read(byte b[]) method.
2) Add a NameNode.abandonFileInProgress() method. The client
calls this if there's a problem during file-create. This
will eliminate the "pendingCreates non-null" exception that
is sometimes seen.
(The problem is when the client cannot connect to a datanode
for the first block in the file. We should abandon the entire
file, not just the block as we were doing previously.)
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java?rev=226687&r1=226686&r2=226687&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java Sun Jul 31 12:54:15 2005
@@ -51,6 +51,12 @@
public void abandonBlock(Block b, String src) throws IOException;
/**
+ * The client wants to abandon writing to the current file, and
+ * let anyone else grab it.
+ */
+ public void abandonFileInProgress(String src) throws IOException;
+
+ /**
* The client is done writing data to the given filename, and would
* like to complete it. Returns whether the file has been closed
* correctly (true) or whether caller should try again (false).
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=226687&r1=226686&r2=226687&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Sun Jul 31 12:54:15 2005
@@ -317,6 +317,13 @@
}
/**
+ * Abandon the entire file in progress
+ */
+ public synchronized void abandonFileInProgress(UTF8 src) throws IOException {
+ internalReleaseCreate(src);
+ }
+
+ /**
* Finalize the created file and make it world-accessible. The
* FSNamesystem will already know the blocks that make up the file.
* Before we return, we make sure that all the file's blocks have
@@ -578,7 +585,10 @@
internalReleaseLock(src, holder);
}
locks.clear();
- internalReleaseCreates(creates);
+ for (Iterator it = creates.iterator(); it.hasNext(); ) {
+ UTF8 src = (UTF8) it.next();
+ internalReleaseCreate(src);
+ }
creates.clear();
}
@@ -682,14 +692,11 @@
private int internalReleaseLock(UTF8 src, UTF8 holder) {
return dir.releaseLock(src, holder);
}
- private void internalReleaseCreates(TreeSet creates) {
- for (Iterator it = creates.iterator(); it.hasNext(); ) {
- UTF8 src = (UTF8) it.next();
- Vector v = (Vector) pendingCreates.remove(src);
- for (Iterator it2 = v.iterator(); it2.hasNext(); ) {
- Block b = (Block) it2.next();
- pendingCreateBlocks.remove(b);
- }
+ private void internalReleaseCreate(UTF8 src) {
+ Vector v = (Vector) pendingCreates.remove(src);
+ for (Iterator it2 = v.iterator(); it2.hasNext(); ) {
+ Block b = (Block) it2.next();
+ pendingCreateBlocks.remove(b);
}
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java?rev=226687&r1=226686&r2=226687&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java Sun Jul 31 12:54:15 2005
@@ -236,16 +236,10 @@
private DataInputStream blockStream;
private DataOutputStream partnerStream;
private Block blocks[];
- private int curBlock = 0;
private DatanodeInfo nodes[][];
private long pos = 0;
- private long bytesRemainingInBlock = 0, curBlockSize = 0;
-
- private int memoryBuf[] = new int[32 * 1024];
- private long memoryStartPos = 0;
- private long openPoint = 0;
- private int memoryBytes = 0;
- private int memoryBytesStart = 0;
+ private long filelen = 0;
+ private long blockEnd = -1;
/**
*/
@@ -254,23 +248,19 @@
this.nodes = nodes;
this.blockStream = null;
this.partnerStream = null;
+ for (int i = 0; i < blocks.length; i++) {
+ this.filelen += blocks[i].getNumBytes();
+ }
}
/**
- * Open a DataInputStream to a DataNode so that it can be written to.
- * This happens when a file is created and each time a new block is allocated.
- * Must get block ID and the IDs of the destinations from the namenode.
+ * Open a DataInputStream to a DataNode so that it can be read from.
+ * We get block ID and the IDs of the destinations at startup, from the namenode.
*/
- private synchronized void nextBlockInputStream() throws IOException {
- nextBlockInputStream(0);
- }
- private synchronized void nextBlockInputStream(long preSkip) throws IOException {
- if (curBlock >= blocks.length) {
+ private synchronized void blockSeekTo(long target) throws IOException {
+ if (target >= filelen) {
throw new IOException("Attempted to read past end of file");
}
- if (bytesRemainingInBlock > 0) {
- throw new IOException("Trying to skip to next block without reading all data");
- }
if (blockStream != null) {
blockStream.close();
@@ -278,17 +268,39 @@
}
//
- // Connect to best DataNode for current Block
+ // Compute desired block
+ //
+ int targetBlock = -1;
+ long targetBlockStart = 0;
+ long targetBlockEnd = 0;
+ for (int i = 0; i < blocks.length; i++) {
+ long blocklen = blocks[i].getNumBytes();
+ targetBlockEnd = targetBlockStart + blocklen - 1;
+
+ if (target >= targetBlockStart && target <= targetBlockEnd) {
+ targetBlock = i;
+ break;
+ } else {
+ targetBlockStart = targetBlockEnd + 1;
+ }
+ }
+ if (targetBlock < 0) {
+ throw new IOException("Impossible situation: could not find target position " + target);
+ }
+ long offsetIntoBlock = target - targetBlockStart;
+
+ //
+ // Connect to best DataNode for desired Block, with potential offset
//
- InetSocketAddress target = null;
+ InetSocketAddress targetAddr = null;
Socket s = null;
TreeSet deadNodes = new TreeSet();
while (s == null) {
DatanodeInfo chosenNode;
try {
- chosenNode = bestNode(nodes[curBlock], deadNodes);
- target = DataNode.createSocketAddr(chosenNode.getName().toString());
+ chosenNode = bestNode(nodes[targetBlock], deadNodes);
+ targetAddr = DataNode.createSocketAddr(chosenNode.getName().toString());
} catch (IOException ie) {
LOG.info("Could not obtain block from any node. Retrying...");
try {
@@ -299,37 +311,34 @@
continue;
}
try {
- s = new Socket(target.getAddress(), target.getPort());
- //LOG.info("Now downloading from " + target + ", block " + blocks[curBlock] + ", skipahead " + preSkip);
+ s = new Socket(targetAddr.getAddress(), targetAddr.getPort());
//
// Xmit header info to datanode
//
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
out.write(OP_READSKIP_BLOCK);
- blocks[curBlock].write(out);
- out.writeLong(preSkip);
+ blocks[targetBlock].write(out);
+ out.writeLong(offsetIntoBlock);
out.flush();
//
// Get bytes in block, set streams
//
DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
- curBlockSize = in.readLong();
+ long curBlockSize = in.readLong();
long amtSkipped = in.readLong();
-
- pos += amtSkipped;
- bytesRemainingInBlock = curBlockSize - amtSkipped;
-
- if (amtSkipped > 0) {
- memoryStartPos = pos;
- memoryBytes = 0;
- memoryBytesStart = 0;
+ if (curBlockSize != blocks[targetBlock].len) {
+ throw new IOException("Recorded block size is " + blocks[targetBlock].len + ", but datanode reports size of " + curBlockSize);
+ }
+ if (amtSkipped != offsetIntoBlock) {
+ throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);
}
- blockStream = in;
- partnerStream = out;
- curBlock++;
- openPoint = pos;
+
+ this.pos = target;
+ this.blockEnd = targetBlockEnd;
+ this.blockStream = in;
+ this.partnerStream = out;
} catch (IOException ex) {
// Put chosen node into dead list, continue
LOG.info("Could not connect to " + target);
@@ -340,242 +349,110 @@
}
/**
+ * Close it down!
*/
- public synchronized void seek(long pos) throws IOException {
- if (pos < 0) {
- throw new IOException("Cannot seek to negative position " + pos);
+ public synchronized void close() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
}
- if (pos == this.pos) {
- return;
+
+ if (blockStream != null) {
+ blockStream.close();
+ blockStream = null;
+ partnerStream.close();
}
+ super.close();
+ closed = true;
+ }
- //
- // If we have remembered enough bytes to seek backwards to the
- // desired pos, we can do so easily
- //
- if ((pos >= memoryStartPos) && (memoryStartPos + memoryBytes > pos)) {
- this.pos = pos;
- } else {
- //
- // If we are seeking backwards (and *don't* have enough memory bytes)
- // we need to reset the NDFS streams. They will be reopened upon the
- // next call to nextBlockInputStream(). After this operation, all
- // seeks will be "forwardSeeks".
- //
- if (pos < memoryStartPos && blockStream != null) {
- blockStream.close();
- blockStream = null;
- partnerStream.close();
- partnerStream = null;
- this.curBlock = 0;
- this.bytesRemainingInBlock = 0;
- this.pos = 0;
- this.memoryStartPos = 0;
- this.memoryBytes = 0;
- this.memoryBytesStart = 0;
- //
- // REMIND - this could be made more efficient, to just
- // skip back block-by-block
- //
+ /**
+ * Basic read()
+ */
+ public synchronized int read() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ int result = -1;
+ if (pos < filelen) {
+ if (pos > blockEnd) {
+ blockSeekTo(pos);
}
-
- //
- // Now read ahead to the desired position.
- //
- long diff = pos - this.pos;
- while (diff > 0) {
- long skipped = skip(diff);
- if (skipped > 0) {
- diff -= skipped;
- }
+ result = blockStream.read();
+ if (result >= 0) {
+ pos++;
}
- // Pos will be incremented by skip()
}
+ return result;
}
/**
- * Skip ahead some number of bytes
+ * Read the entire buffer.
*/
- public synchronized long skip(long skip) throws IOException {
- long toSkip = 0;
- long toFastSkip = 0;
- if (skip > memoryBuf.length) {
- toSkip = memoryBuf.length;
- toFastSkip = skip - toSkip;
- } else {
- toSkip = skip;
- }
- long totalSkipped = 0;
-
- //
- // If there's a lot of fast-skipping to do within the current block,
- // close it and reopen, so we can fast-skip to the target
- //
- /**
- while (toFastSkip > 0) {
- long amtSkipped = super.skip(toFastSkip);
- toFastSkip -= amtSkipped;
- totalSkipped += amtSkipped;
- }
- **/
- long realBytesRemaining = bytesRemainingInBlock + (memoryBytes - (pos - memoryStartPos));
- if (toFastSkip > 0 && realBytesRemaining > 0 &&
- toFastSkip < realBytesRemaining) {
-
- blockStream.close();
- blockStream = null;
- partnerStream.close();
- partnerStream = null;
-
- long backwardsDistance = curBlockSize - realBytesRemaining;
- pos -= backwardsDistance;
- totalSkipped -= backwardsDistance;
- toFastSkip += backwardsDistance;
- bytesRemainingInBlock = 0;
- curBlock--;
-
- memoryStartPos = pos;
- memoryBytes = 0;
- memoryBytesStart = 0;
+ public synchronized int read(byte buf[], int off, int len) throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
}
-
- //
- // If there's any fast-skipping to do, we do it by opening a
- // new block and telling the datanode how many bytes to skip.
- //
- while (toFastSkip > 0 && curBlock < blocks.length) {
-
- if (bytesRemainingInBlock > 0) {
- blockStream.close();
- blockStream = null;
- partnerStream.close();
- partnerStream = null;
-
- pos += bytesRemainingInBlock;
- totalSkipped += bytesRemainingInBlock;
- toFastSkip -= bytesRemainingInBlock;
- bytesRemainingInBlock = 0;
+ if (pos < filelen) {
+ if (pos > blockEnd) {
+ blockSeekTo(pos);
}
-
- long oldPos = pos;
- nextBlockInputStream(toFastSkip);
- long forwardDistance = (pos - oldPos);
- totalSkipped += forwardDistance;
- toFastSkip -= (pos - oldPos);
-
- memoryStartPos = pos;
- memoryBytes = 0;
- memoryBytesStart = 0;
- }
-
- //
- // If there's any remaining toFastSkip, well, there's
- // not much we can do about it. We're at the end of
- // the stream!
- //
- if (toFastSkip > 0) {
- System.err.println("Trying to skip past end of file....");
- toFastSkip = 0;
+ int result = blockStream.read(buf, off, len);
+ if (result >= 0) {
+ pos += result;
+ }
+ return result;
}
-
- //
- // Do a slow skip as we approach, so we can fill the client
- // history buffer
- //
- totalSkipped += super.skip(toSkip);
- toSkip = 0;
- return totalSkipped;
+ return -1;
}
/**
+ * Seek to a new arbitrary location
*/
- public synchronized long getPos() throws IOException {
- return pos;
+ public synchronized void seek(long targetPos) throws IOException {
+ if (targetPos >= filelen) {
+ throw new IOException("Cannot seek after EOF");
+ }
+ if (targetPos >= pos && targetPos <= blockEnd) {
+ skip(targetPos - pos);
+ } else {
+ pos = targetPos;
+ blockEnd = -1;
+ }
}
/**
+ * Skip ahead some number of bytes
*/
- public synchronized int available() throws IOException {
- if (closed) {
- throw new IOException("Stream closed");
+ public synchronized long skip(long skip) throws IOException {
+ if (skip > 0) {
+ long targetPos = pos + skip;
+ targetPos = Math.min(targetPos, filelen);
+
+ if (targetPos <= blockEnd) {
+ return blockStream.skip(skip);
+ } else {
+ pos = targetPos;
+ blockEnd = -1;
+ return skip;
+ }
+ } else {
+ return 0;
}
- return (int) Math.min((long) Integer.MAX_VALUE, bytesRemainingInBlock);
}
/**
*/
- public synchronized void close() throws IOException {
- if (closed) {
- throw new IOException("Stream closed");
- }
-
- if (blockStream != null) {
- blockStream.close();
- blockStream = null;
- partnerStream.close();
- }
- super.close();
- closed = true;
+ public synchronized long getPos() throws IOException {
+ return pos;
}
/**
- * Other read() functions are implemented in terms of
- * this one.
*/
- public synchronized int read() throws IOException {
+ public synchronized int available() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
-
- int b = 0;
- if (pos - memoryStartPos < memoryBytes) {
- //
- // Move the memoryStartPos up to current pos, if necessary.
- //
- int diff = (int) (pos - memoryStartPos);
-
- //
- // Fetch the byte
- //
- b = memoryBuf[(memoryBytesStart + diff) % memoryBuf.length];
-
- //
- // Bump the pos
- //
- pos++;
- } else {
- if (bytesRemainingInBlock == 0) {
- if (curBlock < blocks.length) {
- nextBlockInputStream();
- } else {
- return -1;
- }
- }
- b = blockStream.read();
- if (b >= 0) {
- //
- // Remember byte so we can seek backwards at some later time
- //
- if (memoryBytes == memoryBuf.length) {
- memoryStartPos++;
- }
-
- if (memoryBuf.length > 0) {
- int target;
- if (memoryBytes == memoryBuf.length) {
- target = memoryBytesStart;
- memoryBytesStart = (memoryBytesStart + 1) % memoryBuf.length;
- } else {
- target = (memoryBytesStart + memoryBytes) % memoryBuf.length;
- memoryBytes++;
- }
- memoryBuf[target] = b;
- }
- bytesRemainingInBlock--;
- pos++;
- }
- }
- return b;
+ return (int) (filelen - pos);
}
/**
@@ -661,7 +538,6 @@
}
} catch (InterruptedException ie) {
}
-
} else {
blockComplete = true;
}
@@ -676,7 +552,6 @@
InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString());
Socket s = null;
try {
- //System.err.println("Trying to connect to " + target);
s = new Socket(target.getAddress(), target.getPort());
} catch (IOException ie) {
// Connection failed. Let's wait a little bit and retry
@@ -687,7 +562,11 @@
Thread.sleep(6000);
} catch (InterruptedException iex) {
}
- namenode.abandonBlock(block, src.toString());
+ if (firstTime) {
+ namenode.abandonFileInProgress(src.toString());
+ } else {
+ namenode.abandonBlock(block, src.toString());
+ }
retry = true;
continue;
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=226687&r1=226686&r2=226687&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Sun Jul 31 12:54:15 2005
@@ -123,6 +123,9 @@
throw new IOException("Cannot abandon block during write to " + src);
}
}
+ public void abandonFileInProgress(String src) throws IOException {
+ namesystem.abandonFileInProgress(new UTF8(src));
+ }
public boolean complete(String src, String clientName) throws IOException {
int returnCode = namesystem.completeFile(new UTF8(src), new UTF8(clientName));
if (returnCode == STILL_WAITING) {