You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:06:08 UTC
svn commit: r885143 [7/18] - in /hadoop/hdfs/branches/HDFS-326: ./
.eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/
src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs/...
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java Sat Nov 28 20:05:56 2009
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -36,8 +37,6 @@
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -65,12 +64,15 @@
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -84,6 +86,9 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -103,8 +108,6 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.InvalidAccessTokenException;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
@@ -125,12 +128,15 @@
********************************************************/
public class DFSClient implements FSConstants, java.io.Closeable {
public static final Log LOG = LogFactory.getLog(DFSClient.class);
+ public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
private final ClientProtocol namenode;
private final ClientProtocol rpcNamenode;
final UnixUserGroupInformation ugi;
volatile boolean clientRunning = true;
+ private volatile FsServerDefaults serverDefaults;
+ private volatile long serverDefaultsLastUpdate;
Random r = new Random();
final String clientName;
final LeaseChecker leasechecker = new LeaseChecker();
@@ -245,13 +251,14 @@
throws IOException {
this.conf = conf;
this.stats = stats;
- this.socketTimeout = conf.getInt("dfs.socket.timeout",
+ this.socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsConstants.READ_TIMEOUT);
this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
HdfsConstants.WRITE_TIMEOUT);
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
// dfs.write.packet.size is an internal config variable
- this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
+ this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
this.maxBlockAcquireFailures =
conf.getInt("dfs.client.max.block.acquire.failures",
MAX_BLOCK_ACQUIRE_FAILURES);
@@ -270,7 +277,7 @@
} else {
this.clientName = "DFSClient_" + r.nextInt();
}
- defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
defaultReplication = (short) conf.getInt("dfs.replication", 3);
if (nameNodeAddr != null && rpcNamenode == null) {
@@ -337,6 +344,18 @@
}
/**
+ * Get server default values for a number of configuration params.
+ */
+ public FsServerDefaults getServerDefaults() throws IOException {
+ long now = System.currentTimeMillis();
+ if (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD) {
+ serverDefaults = namenode.getServerDefaults();
+ serverDefaultsLastUpdate = now;
+ }
+ return serverDefaults;
+ }
+
+ /**
* Report corrupt blocks that were discovered by the client.
*/
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
@@ -408,10 +427,24 @@
* namenode, and then reads from all the right places. Creates
* inner subclass of InputStream that does the right out-of-band
* work.
+ * @deprecated Use {@link #open(String, int, boolean)} instead.
*/
- DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
+ @Deprecated
+ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
FileSystem.Statistics stats
) throws IOException {
+ return open(src, buffersize, verifyChecksum);
+ }
+
+
+ /**
+ * Create an input stream that obtains a nodelist from the
+ * namenode, and then reads from all the right places. Creates
+ * inner subclass of InputStream that does the right out-of-band
+ * work.
+ */
+ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum
+ ) throws IOException {
checkOpen();
// Get block info from namenode
return new DFSInputStream(src, buffersize, verifyChecksum);
@@ -513,6 +546,23 @@
}
/**
+ * Call
+ * {@link #create(String,FsPermission,EnumSet,boolean,short,long,Progressable,int)}
+ * with createParent set to true.
+ */
+ public OutputStream create(String src,
+ FsPermission permission,
+ EnumSet<CreateFlag> flag,
+ short replication,
+ long blockSize,
+ Progressable progress,
+ int buffersize
+ ) throws IOException {
+ return create(src, permission, flag, true,
+ replication, blockSize, progress, buffersize);
+ }
+
+ /**
* Create a new dfs file with the specified block replication
* with write-progress reporting and return an output stream for writing
* into the file.
@@ -521,14 +571,16 @@
* @param permission The permission of the directory being created.
* If permission == null, use {@link FsPermission#getDefault()}.
* @param flag do not check for file existence if true
+ * @param createParent create missing parent directory if true
* @param replication block replication
* @return output stream
* @throws IOException
- * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, short, long)
+ * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long)
*/
public OutputStream create(String src,
FsPermission permission,
EnumSet<CreateFlag> flag,
+ boolean createParent,
short replication,
long blockSize,
Progressable progress,
@@ -541,11 +593,36 @@
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
LOG.debug(src + ": masked=" + masked);
OutputStream result = new DFSOutputStream(src, masked,
- flag, replication, blockSize, progress, buffersize,
- conf.getInt("io.bytes.per.checksum", 512));
+ flag, createParent, replication, blockSize, progress, buffersize,
+ conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
leasechecker.put(src, result);
return result;
}
+
+ /**
+ * Same as {{@link #create(String, FsPermission, EnumSet, short, long,
+ * Progressable, int)} except that the permission
+ * is absolute (ie has already been masked with umask.
+ *
+ */
+ public OutputStream primitiveCreate(String src,
+ FsPermission absPermission,
+ EnumSet<CreateFlag> flag,
+ boolean createParent,
+ short replication,
+ long blockSize,
+ Progressable progress,
+ int buffersize,
+ int bytesPerChecksum)
+ throws IOException {
+ checkOpen();
+ OutputStream result = new DFSOutputStream(src, absPermission,
+ flag, createParent, replication, blockSize, progress, buffersize,
+ bytesPerChecksum);
+ leasechecker.put(src, result);
+ return result;
+ }
/**
* Append to an existing HDFS file.
@@ -572,7 +649,8 @@
DSQuotaExceededException.class);
}
OutputStream result = new DFSOutputStream(src, buffersize, progress,
- lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
+ lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
leasechecker.put(src, result);
return result;
}
@@ -599,8 +677,10 @@
/**
* Rename file or directory.
- * See {@link ClientProtocol#rename(String, String)}.
+ * See {@link ClientProtocol#rename(String, String)}.
+ * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead.
*/
+ @Deprecated
public boolean rename(String src, String dst) throws IOException {
checkOpen();
try {
@@ -613,6 +693,34 @@
}
/**
+ * Move blocks from src to trg and delete src
+ * See {@link ClientProtocol#concat(String, String [])}.
+ */
+ public void concat(String trg, String [] srcs) throws IOException {
+ checkOpen();
+ try {
+ namenode.concat(trg, srcs);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ NSQuotaExceededException.class,
+ DSQuotaExceededException.class);
+ }
+ }
+ /**
+ * Rename file or directory.
+ * See {@link ClientProtocol#rename(String, String, Options.Rename...)}
+ */
+ public void rename(String src, String dst, Options.Rename... options) throws IOException {
+ checkOpen();
+ try {
+ namenode.rename(src, dst, options);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ NSQuotaExceededException.class,
+ DSQuotaExceededException.class);
+ }
+ }
+ /**
* Delete file or directory.
* See {@link ClientProtocol#delete(String)}.
*/
@@ -667,7 +775,7 @@
* @return The checksum
* @see DistributedFileSystem#getFileChecksum(Path)
*/
- MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
+ public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen();
return getFileChecksum(src, namenode, socketFactory, socketTimeout);
}
@@ -941,8 +1049,9 @@
/**
*/
+ @Deprecated
public boolean mkdirs(String src) throws IOException {
- return mkdirs(src, null);
+ return mkdirs(src, null, true);
}
/**
@@ -952,10 +1061,11 @@
* @param src The path of the directory being created
* @param permission The permission of the directory being created.
* If permission == null, use {@link FsPermission#getDefault()}.
+ * @param createParent create missing parent directory if true
* @return True if the operation success.
- * @see ClientProtocol#mkdirs(String, FsPermission)
+ * @see ClientProtocol#mkdirs(String, FsPermission, boolean)
*/
- public boolean mkdirs(String src, FsPermission permission)throws IOException{
+ public boolean mkdirs(String src, FsPermission permission, boolean createParent)throws IOException{
checkOpen();
if (permission == null) {
permission = FsPermission.getDefault();
@@ -963,7 +1073,31 @@
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
LOG.debug(src + ": masked=" + masked);
try {
- return namenode.mkdirs(src, masked);
+ return namenode.mkdirs(src, masked, createParent);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ NSQuotaExceededException.class,
+ DSQuotaExceededException.class,
+ FileNotFoundException.class,
+ FileAlreadyExistsException.class);
+ }
+ }
+
+ /**
+ * Same {{@link #mkdirs(String, FsPermission, boolean)} except
+ * that the permissions has already been masked against umask.
+ */
+ public boolean primitiveMkdir(String src, FsPermission absPermission)
+ throws IOException{
+ checkOpen();
+ if (absPermission == null) {
+ absPermission =
+ FsPermission.getDefault().applyUMask(FsPermission.getUMask(conf));
+ }
+
+ LOG.debug(src + ": masked=" + absPermission);
+ try {
+ return namenode.mkdirs(src, absPermission, true);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
NSQuotaExceededException.class,
@@ -1081,10 +1215,17 @@
}
}
- synchronized void close() {
- while (!pendingCreates.isEmpty()) {
- String src = pendingCreates.firstKey();
- OutputStream out = pendingCreates.remove(src);
+ void close() {
+ while (true) {
+ String src;
+ OutputStream out;
+ synchronized (this) {
+ if (pendingCreates.isEmpty()) {
+ return;
+ }
+ src = pendingCreates.firstKey();
+ out = pendingCreates.remove(src);
+ }
if (out != null) {
try {
out.close();
@@ -1349,8 +1490,8 @@
int dataLen = in.readInt();
// Sanity check the lengths
- if ( dataLen < 0 ||
- ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
+ if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
+ ( dataLen != 0 && lastPacketInBlock) ||
(seqno != (lastSeqNo + 1)) ) {
throw new IOException("BlockReader: error in packet header" +
"(chunkOffset : " + chunkOffset +
@@ -1414,7 +1555,7 @@
checksumSize = this.checksum.getChecksumSize();
}
- public static BlockReader newBlockReader(Socket sock, String file, long blockId, AccessToken accessToken,
+ public static BlockReader newBlockReader(Socket sock, String file, long blockId, BlockAccessToken accessToken,
long genStamp, long startOffset, long len, int bufferSize) throws IOException {
return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
true);
@@ -1422,7 +1563,7 @@
/** Java Doc required */
public static BlockReader newBlockReader( Socket sock, String file, long blockId,
- AccessToken accessToken,
+ BlockAccessToken accessToken,
long genStamp,
long startOffset, long len,
int bufferSize, boolean verifyChecksum)
@@ -1433,7 +1574,7 @@
public static BlockReader newBlockReader( Socket sock, String file,
long blockId,
- AccessToken accessToken,
+ BlockAccessToken accessToken,
long genStamp,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
@@ -1529,6 +1670,7 @@
private BlockReader blockReader = null;
private boolean verifyChecksum;
private LocatedBlocks locatedBlocks = null;
+ private long lastBlockBeingWrittenLength = 0;
private DatanodeInfo currentNode = null;
private Block currentBlock = null;
private long pos = 0;
@@ -1552,7 +1694,7 @@
this.verifyChecksum = verifyChecksum;
this.buffersize = buffersize;
this.src = src;
- prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize);
+ prefetchSize = conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, prefetchSize);
openInfo();
}
@@ -1561,6 +1703,9 @@
*/
synchronized void openInfo() throws IOException {
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("newInfo = " + newInfo);
+ }
if (newInfo == null) {
throw new IOException("Cannot open filename " + src);
}
@@ -1575,11 +1720,46 @@
}
}
this.locatedBlocks = newInfo;
+ this.lastBlockBeingWrittenLength = 0;
+ if (!locatedBlocks.isLastBlockComplete()) {
+ final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
+ if (last != null) {
+ final long len = readBlockLength(last);
+ last.getBlock().setNumBytes(len);
+ this.lastBlockBeingWrittenLength = len;
+ }
+ }
+
this.currentNode = null;
}
+
+ /** Read the block length from one of the datanodes. */
+ private long readBlockLength(LocatedBlock locatedblock) throws IOException {
+ if (locatedblock == null || locatedblock.getLocations().length == 0) {
+ return 0;
+ }
+ for(DatanodeInfo datanode : locatedblock.getLocations()) {
+ try {
+ final ClientDatanodeProtocol cdp = createClientDatanodeProtocolProxy(
+ datanode, conf);
+ final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
+ if (n >= 0) {
+ return n;
+ }
+ }
+ catch(IOException ioe) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Faild to getReplicaVisibleLength from datanode "
+ + datanode + " for block " + locatedblock.getBlock(), ioe);
+ }
+ }
+ }
+ throw new IOException("Cannot obtain block length for " + locatedblock);
+ }
public synchronized long getFileLength() {
- return (locatedBlocks == null) ? 0 : locatedBlocks.getFileLength();
+ return locatedBlocks == null? 0:
+ locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
}
/**
@@ -1615,17 +1795,36 @@
private synchronized LocatedBlock getBlockAt(long offset,
boolean updatePosition) throws IOException {
assert (locatedBlocks != null) : "locatedBlocks is null";
- // search cached blocks first
- int targetBlockIdx = locatedBlocks.findBlock(offset);
- if (targetBlockIdx < 0) { // block is not cached
- targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
- // fetch more blocks
- LocatedBlocks newBlocks;
- newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
- assert (newBlocks != null) : "Could not find target position " + offset;
- locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+
+ final LocatedBlock blk;
+
+ //check offset
+ if (offset < 0 || offset >= getFileLength()) {
+ throw new IOException("offset < 0 || offset > getFileLength(), offset="
+ + offset
+ + ", updatePosition=" + updatePosition
+ + ", locatedBlocks=" + locatedBlocks);
+ }
+ else if (offset >= locatedBlocks.getFileLength()) {
+ // offset to the portion of the last block,
+ // which is not known to the name-node yet;
+ // getting the last block
+ blk = locatedBlocks.getLastLocatedBlock();
+ }
+ else {
+ // search cached blocks first
+ int targetBlockIdx = locatedBlocks.findBlock(offset);
+ if (targetBlockIdx < 0) { // block is not cached
+ targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+ // fetch more blocks
+ LocatedBlocks newBlocks;
+ newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
+ assert (newBlocks != null) : "Could not find target position " + offset;
+ locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+ }
+ blk = locatedBlocks.get(targetBlockIdx);
}
- LocatedBlock blk = locatedBlocks.get(targetBlockIdx);
+
// update current position
if (updatePosition) {
this.pos = offset;
@@ -1662,6 +1861,27 @@
private synchronized List<LocatedBlock> getBlockRange(long offset,
long length)
throws IOException {
+ final List<LocatedBlock> blocks;
+ if (locatedBlocks.isLastBlockComplete()) {
+ blocks = getFinalizedBlockRange(offset, length);
+ }
+ else {
+ if (length + offset > locatedBlocks.getFileLength()) {
+ length = locatedBlocks.getFileLength() - offset;
+ }
+ blocks = getFinalizedBlockRange(offset, length);
+ blocks.add(locatedBlocks.getLastLocatedBlock());
+ }
+ return blocks;
+ }
+
+ /**
+ * Get blocks in the specified range.
+ * Includes only the complete blocks.
+ * Fetch them from the namenode if not cached.
+ */
+ private synchronized List<LocatedBlock> getFinalizedBlockRange(
+ long offset, long length) throws IOException {
assert (locatedBlocks != null) : "locatedBlocks is null";
List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
// search cached blocks first
@@ -1715,6 +1935,8 @@
//
DatanodeInfo chosenNode = null;
int refetchToken = 1; // only need to get a new access token once
+ failures = 0;
+
while (true) {
//
// Compute desired block
@@ -1732,7 +1954,7 @@
NetUtils.connect(s, targetAddr, socketTimeout);
s.setSoTimeout(socketTimeout);
Block blk = targetBlock.getBlock();
- AccessToken accessToken = targetBlock.getAccessToken();
+ BlockAccessToken accessToken = targetBlock.getAccessToken();
blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(),
accessToken,
@@ -1944,6 +2166,7 @@
//
Socket dn = null;
int refetchToken = 1; // only need to get a new access token once
+ failures = 0;
while (true) {
// cached block locations may have been updated by chooseDataNode()
@@ -1959,7 +2182,7 @@
dn = socketFactory.createSocket();
NetUtils.connect(dn, targetAddr, socketTimeout);
dn.setSoTimeout(socketTimeout);
- AccessToken accessToken = block.getAccessToken();
+ BlockAccessToken accessToken = block.getAccessToken();
int len = (int) (end - start + 1);
@@ -2139,14 +2362,18 @@
return pos;
}
- /**
+ /** Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
*/
@Override
public synchronized int available() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
- return (int) (getFileLength() - pos);
+
+ final long remaining = getFileLength() - pos;
+ return remaining <= Integer.MAX_VALUE? (int)remaining: Integer.MAX_VALUE;
}
/**
@@ -2165,8 +2392,8 @@
}
}
- static class DFSDataInputStream extends FSDataInputStream {
- DFSDataInputStream(DFSInputStream in)
+ public static class DFSDataInputStream extends FSDataInputStream {
+ public DFSDataInputStream(DFSInputStream in)
throws IOException {
super(in);
}
@@ -2229,7 +2456,7 @@
private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
private Packet currentPacket = null;
- private DataStreamer streamer = new DataStreamer();
+ private DataStreamer streamer;
private long currentSeqno = 0;
private long bytesCurBlock = 0; // bytes writen in current block
private int packetSize = 0; // write packet size, including the header.
@@ -2336,6 +2563,18 @@
buffer.reset();
return buffer;
}
+
+ // get the packet's last byte's offset in the block
+ long getLastByteOffsetBlock() {
+ return offsetInBlock + dataPos - dataStart;
+ }
+
+ public String toString() {
+ return "packet seqno:" + this.seqno +
+ " offsetInBlock:" + this.offsetInBlock +
+ " lastPacketInBlock:" + this.lastPacketInBlock +
+ " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+ }
}
//
@@ -2347,18 +2586,102 @@
// if them are received, the DataStreamer closes the current block.
//
class DataStreamer extends Daemon {
- private static final int MAX_RECOVERY_ERROR_COUNT = 5; // try block recovery 5 times
- private int recoveryErrorCount = 0; // number of times block recovery failed
private volatile boolean streamerClosed = false;
- private Block block;
- private AccessToken accessToken;
+ private Block block; // its length is number of bytes acked
+ private BlockAccessToken accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
private ResponseProcessor response = null;
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
+ private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
volatile boolean hasError = false;
- volatile int errorIndex = 0;
-
+ volatile int errorIndex = -1;
+ private BlockConstructionStage stage; // block construction stage
+ private long bytesSent = 0; // number of bytes that've been sent
+
+ /**
+ * Default construction for file create
+ */
+ private DataStreamer() {
+ stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+ }
+
+ /**
+ * Construct a data streamer for append
+ * @param lastBlock last block of the file to be appended
+ * @param stat status of the file to be appended
+ * @param bytesPerChecksum number of bytes per checksum
+ * @throws IOException if error occurs
+ */
+ private DataStreamer(LocatedBlock lastBlock, FileStatus stat,
+ int bytesPerChecksum) throws IOException {
+ stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
+ block = lastBlock.getBlock();
+ bytesSent = block.getNumBytes();
+ accessToken = lastBlock.getAccessToken();
+ long usedInLastBlock = stat.getLen() % blockSize;
+ int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+ // calculate the amount of free space in the pre-existing
+ // last crc chunk
+ int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+ int freeInCksum = bytesPerChecksum - usedInCksum;
+
+ // if there is space in the last block, then we have to
+ // append to that block
+ if (freeInLastBlock == blockSize) {
+ throw new IOException("The last block for file " +
+ src + " is full.");
+ }
+
+ if (usedInCksum > 0 && freeInCksum > 0) {
+ // if there is space in the last partial chunk, then
+ // setup in such a way that the next packet will have only
+ // one chunk that fills up the partial chunk.
+ //
+ computePacketChunkSize(0, freeInCksum);
+ resetChecksumChunk(freeInCksum);
+ appendChunk = true;
+ } else {
+ // if the remaining space in the block is smaller than
+ // that expected size of of a packet, then create
+ // smaller size packet.
+ //
+ computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock),
+ bytesPerChecksum);
+ }
+
+ // setup pipeline to append to the last block XXX retries??
+ nodes = lastBlock.getLocations();
+ errorIndex = -1; // no errors yet.
+ if (nodes.length < 1) {
+ throw new IOException("Unable to retrieve blocks locations " +
+ " for last block " + block +
+ "of file " + src);
+
+ }
+ }
+
+ /**
+ * Initialize for data streaming
+ */
+ private void initDataStreaming() {
+ this.setName("DataStreamer for file " + src +
+ " block " + block);
+ response = new ResponseProcessor(nodes);
+ response.start();
+ stage = BlockConstructionStage.DATA_STREAMING;
+ }
+
+ private void endBlock() {
+ LOG.debug("Closing old block " + block);
+ this.setName("DataStreamer for file " + src);
+ closeResponder();
+ closeStream();
+ nodes = null;
+ stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+ }
+
/*
* streamer thread is the only thread that opens streams to datanode,
* and closes them. Any error recovery is also done by this thread.
@@ -2378,47 +2701,69 @@
Packet one = null;
- // process IO errors if any
- boolean doSleep = processDatanodeError(hasError, false);
+ try {
+ // process datanode IO errors if any
+ boolean doSleep = false;
+ if (hasError && errorIndex>=0) {
+ doSleep = processDatanodeError();
+ }
- synchronized (dataQueue) {
- // wait for a packet to be sent.
- while ((!streamerClosed && !hasError && clientRunning
- && dataQueue.size() == 0) || doSleep) {
- try {
- dataQueue.wait(1000);
- } catch (InterruptedException e) {
+ synchronized (dataQueue) {
+ // wait for a packet to be sent.
+ while ((!streamerClosed && !hasError && clientRunning
+ && dataQueue.size() == 0) || doSleep) {
+ try {
+ dataQueue.wait(1000);
+ } catch (InterruptedException e) {
+ }
+ doSleep = false;
}
- doSleep = false;
- }
- if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
- continue;
+ if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
+ continue;
+ }
+ // get packet to be sent.
+ one = dataQueue.getFirst();
}
- // get packet to be sent.
- one = dataQueue.getFirst();
- }
-
- try {
- long offsetInBlock = one.offsetInBlock;
// get new block from namenode.
- if (blockStream == null) {
+ if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block");
- nodes = nextBlockOutputStream(src);
- this.setName("DataStreamer for file " + src +
- " block " + block);
- response = new ResponseProcessor(nodes);
- response.start();
+ nodes = nextBlockOutputStream(src);
+ initDataStreaming();
+ } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
+ LOG.debug("Append to block " + block);
+ setupPipelineForAppendOrRecovery();
+ initDataStreaming();
}
- if (offsetInBlock >= blockSize) {
+ long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
+ if (lastByteOffsetInBlock > blockSize) {
throw new IOException("BlockSize " + blockSize +
" is smaller than data size. " +
" Offset of packet in block " +
- offsetInBlock +
+ lastByteOffsetInBlock +
" Aborting file " + src);
}
+ if (one.lastPacketInBlock) {
+ // wait for all data packets have been successfully acked
+ synchronized (dataQueue) {
+ while (!streamerClosed && !hasError &&
+ ackQueue.size() != 0 && clientRunning) {
+ try {
+ // wait for acks to arrive from datanodes
+ dataQueue.wait(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ if (streamerClosed || hasError || !clientRunning) {
+ continue;
+ }
+ stage = BlockConstructionStage.PIPELINE_CLOSE;
+ }
+
+ // send the packet
ByteBuffer buf = one.getBuffer();
synchronized (dataQueue) {
@@ -2428,19 +2773,45 @@
dataQueue.notifyAll();
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DataStreamer block " + block +
+ " sending packet " + one);
+ }
+
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
+ blockStream.flush();
+
+ // update bytesSent
+ long tmpBytesSent = one.getLastByteOffsetBlock();
+ if (bytesSent < tmpBytesSent) {
+ bytesSent = tmpBytesSent;
+ }
+
+ if (streamerClosed || hasError || !clientRunning) {
+ continue;
+ }
+ // Is this block full?
if (one.lastPacketInBlock) {
- blockStream.writeInt(0); // indicate end-of-block
+ // wait for the close packet has been acked
+ synchronized (dataQueue) {
+ while (!streamerClosed && !hasError &&
+ ackQueue.size() != 0 && clientRunning) {
+ dataQueue.wait(1000);// wait for acks to arrive from datanodes
+ }
+ }
+ if (streamerClosed || hasError || !clientRunning) {
+ continue;
+ }
+
+ endBlock();
}
- blockStream.flush();
- if (LOG.isDebugEnabled()) {
- LOG.debug("DataStreamer block " + block +
- " wrote packet seqno:" + one.seqno +
- " size:" + buf.remaining() +
- " offsetInBlock:" + one.offsetInBlock +
- " lastPacketInBlock:" + one.lastPacketInBlock);
+ if (progress != null) { progress.progress(); }
+
+ // This is used by unit test to trigger race conditions.
+ if (artificialSlowdown != 0 && clientRunning) {
+ Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
LOG.warn("DataStreamer Exception: " +
@@ -2449,47 +2820,16 @@
setLastException((IOException)e);
}
hasError = true;
- }
-
-
- if (streamerClosed || hasError || !clientRunning) {
- continue;
- }
-
- // Is this block full?
- if (one.lastPacketInBlock) {
- synchronized (dataQueue) {
- while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
- try {
- dataQueue.wait(1000); // wait for acks to arrive from datanodes
- } catch (InterruptedException e) {
- }
- }
- }
- if (streamerClosed || hasError || !clientRunning) {
- continue;
+ if (errorIndex == -1) { // not a datanode error
+ streamerClosed = true;
}
-
- LOG.debug("Closing old block " + block);
- this.setName("DataStreamer for file " + src);
- closeResponder();
- closeStream();
- nodes = null;
- }
- if (progress != null) { progress.progress(); }
-
- // This is used by unit test to trigger race conditions.
- if (artificialSlowdown != 0 && clientRunning) {
- try {
- Thread.sleep(artificialSlowdown);
- } catch (InterruptedException e) {}
}
}
closeInternal();
}
private void closeInternal() {
- closeResponder();
+ closeResponder(); // close and join
closeStream();
streamerClosed = true;
closed = true;
@@ -2502,10 +2842,16 @@
* close both streamer and DFSOutputStream, should be called only
* by an external thread and only after all data to be sent has
* been flushed to datanode.
+ *
+ * Interrupt this data streamer if force is true
+ *
+ * @param force if this data stream is forced to be closed
*/
- void close() {
+ void close(boolean force) {
streamerClosed = true;
- this.interrupt();
+ if (force) {
+ this.interrupt();
+ }
}
private void closeResponder() {
@@ -2563,12 +2909,12 @@
// verify seqno from datanode
long seqno = blockReplyStream.readLong();
LOG.debug("DFSClient received ack for seqno " + seqno);
+ Packet one = null;
if (seqno == -1) {
continue;
} else if (seqno == -2) {
// no nothing
} else {
- Packet one = null;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
@@ -2581,10 +2927,20 @@
}
// processes response status from all datanodes.
+ String replies = null;
+ if (LOG.isDebugEnabled()) {
+ replies = "DFSClient Replies for seqno " + seqno + " are";
+ }
for (int i = 0; i < targets.length && clientRunning; i++) {
final DataTransferProtocol.Status reply
= DataTransferProtocol.Status.read(blockReplyStream);
+ if (LOG.isDebugEnabled()) {
+ replies += " " + reply;
+ }
if (reply != SUCCESS) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(replies);
+ }
errorIndex = i; // first bad datanode
throw new IOException("Bad response " + reply +
" for block " + block +
@@ -2593,6 +2949,18 @@
}
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(replies);
+ }
+
+ if (one == null) {
+ throw new IOException("Panic: responder did not receive " +
+ "an ack for a packet: " + seqno);
+ }
+
+ // update bytesAcked
+ block.setNumBytes(one.getLastByteOffsetBlock());
+
synchronized (dataQueue) {
ackQueue.removeFirst();
dataQueue.notifyAll();
@@ -2603,6 +2971,7 @@
setLastException((IOException)e);
}
hasError = true;
+ errorIndex = errorIndex==-1 ? 0 : errorIndex;
synchronized (dataQueue) {
dataQueue.notifyAll();
}
@@ -2625,21 +2994,12 @@
// threads and mark stream as closed. Returns true if we should
// sleep for a while after returning from this call.
//
- private boolean processDatanodeError(boolean error, boolean isAppend) {
- if (!error) {
- return false;
- }
+ private boolean processDatanodeError() throws IOException {
if (response != null) {
LOG.info("Error Recovery for block " + block +
" waiting for responder to exit. ");
return true;
}
- if (errorIndex >= 0) {
- LOG.warn("Error Recovery for block " + block
- + " bad datanode[" + errorIndex + "] "
- + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
- }
-
closeStream();
// move packets from ack queue to front of the data queue
@@ -2648,31 +3008,57 @@
ackQueue.clear();
}
- boolean success = false;
- while (!success && !streamerClosed && clientRunning) {
- DatanodeInfo[] newnodes = null;
- if (nodes == null) {
- String msg = "Could not get block locations. " + "Source file \""
- + src + "\" - Aborting...";
- LOG.warn(msg);
- setLastException(new IOException(msg));
- streamerClosed = true;
- return false;
- }
- StringBuilder pipelineMsg = new StringBuilder();
- for (int j = 0; j < nodes.length; j++) {
- pipelineMsg.append(nodes[j].getName());
- if (j < nodes.length - 1) {
- pipelineMsg.append(", ");
+ boolean doSleep = setupPipelineForAppendOrRecovery();
+
+ if (!streamerClosed && clientRunning) {
+ if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
+ synchronized (dataQueue) {
+ dataQueue.remove(); // remove the end of block packet
+ dataQueue.notifyAll();
}
+ endBlock();
+ } else {
+ initDataStreaming();
}
+ }
+
+ return doSleep;
+ }
+
+
+ /**
+ * Open a DataOutputStream to a DataNode pipeline so that
+ * it can be written to.
+ * This happens when a file is appended or data streaming fails
+ * It keeps on trying until a pipeline is setup
+ */
+ private boolean setupPipelineForAppendOrRecovery() throws IOException {
+ // check number of datanodes
+ if (nodes == null || nodes.length == 0) {
+ String msg = "Could not get block locations. " + "Source file \""
+ + src + "\" - Aborting...";
+ LOG.warn(msg);
+ setLastException(new IOException(msg));
+ streamerClosed = true;
+ return false;
+ }
+
+ boolean success = false;
+ long newGS = 0L;
+ while (!success && !streamerClosed && clientRunning) {
+ boolean isRecovery = hasError;
// remove bad datanode from list of datanodes.
// If errorIndex was not set (i.e. appends), then do not remove
// any datanodes
//
- if (errorIndex < 0) {
- newnodes = nodes;
- } else {
+ if (errorIndex >= 0) {
+ StringBuilder pipelineMsg = new StringBuilder();
+ for (int j = 0; j < nodes.length; j++) {
+ pipelineMsg.append(nodes[j].getName());
+ if (j < nodes.length - 1) {
+ pipelineMsg.append(", ");
+ }
+ }
if (nodes.length <= 1) {
lastException = new IOException("All datanodes " + pipelineMsg
+ " are bad. Aborting...");
@@ -2682,86 +3068,32 @@
LOG.warn("Error Recovery for block " + block +
" in pipeline " + pipelineMsg +
": bad datanode " + nodes[errorIndex].getName());
- newnodes = new DatanodeInfo[nodes.length-1];
+ DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
newnodes.length-errorIndex);
+ nodes = newnodes;
+ this.hasError = false;
+ lastException = null;
+ errorIndex = -1;
}
- // Tell the primary datanode to do error recovery
- // by stamping appropriate generation stamps.
- //
- LocatedBlock newBlock = null;
- ClientDatanodeProtocol primary = null;
- DatanodeInfo primaryNode = null;
- try {
- // Pick the "least" datanode as the primary datanode to avoid deadlock.
- primaryNode = Collections.min(Arrays.asList(newnodes));
- primary = createClientDatanodeProtocolProxy(primaryNode, conf);
- newBlock = primary.recoverBlock(block, isAppend, newnodes);
- } catch (IOException e) {
- recoveryErrorCount++;
- if (recoveryErrorCount > MAX_RECOVERY_ERROR_COUNT) {
- if (nodes.length > 1) {
- // if the primary datanode failed, remove it from the list.
- // The original bad datanode is left in the list because it is
- // conservative to remove only one datanode in one iteration.
- for (int j = 0; j < nodes.length; j++) {
- if (nodes[j].equals(primaryNode)) {
- errorIndex = j; // forget original bad node.
- }
- }
- // remove primary node from list
- newnodes = new DatanodeInfo[nodes.length-1];
- System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
- System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
- newnodes.length-errorIndex);
- nodes = newnodes;
- LOG.warn("Error Recovery for block " + block + " failed "
- + " because recovery from primary datanode " + primaryNode
- + " failed " + recoveryErrorCount + " times. "
- + " Pipeline was " + pipelineMsg
- + ". Marking primary datanode as bad.");
- recoveryErrorCount = 0;
- errorIndex = -1;
- return true; // sleep when we return from here
- }
- String emsg = "Error Recovery for block " + block + " failed "
- + " because recovery from primary datanode " + primaryNode
- + " failed " + recoveryErrorCount + " times. "
- + " Pipeline was " + pipelineMsg + ". Aborting...";
- LOG.warn(emsg);
- lastException = new IOException(emsg);
- streamerClosed = true;
- return false; // abort with IOexception
- }
- LOG.warn("Error Recovery for block " + block + " failed "
- + " because recovery from primary datanode " + primaryNode
- + " failed " + recoveryErrorCount + " times. "
- + " Pipeline was " + pipelineMsg + ". Will retry...");
- return true; // sleep when we return from here
- } finally {
- RPC.stopProxy(primary);
- }
- recoveryErrorCount = 0; // block recovery successful
-
- // If the block recovery generated a new generation stamp, use that
- // from now on. Also, setup new pipeline
- // newBlock should never be null and it should contain a newly
- // generated access token.
- block = newBlock.getBlock();
- accessToken = newBlock.getAccessToken();
- nodes = newBlock.getLocations();
-
- this.hasError = false;
- lastException = null;
- errorIndex = 0;
- success = createBlockOutputStream(nodes, clientName, true);
+ // get a new generation stamp and an access token
+ LocatedBlock lb = namenode.updateBlockForPipeline(block, clientName);
+ newGS = lb.getBlock().getGenerationStamp();
+ accessToken = lb.getAccessToken();
+
+ // set up the pipeline again with the remaining nodes
+ success = createBlockOutputStream(nodes, newGS, isRecovery);
}
- if (!streamerClosed && clientRunning) {
- response = new ResponseProcessor(nodes);
- response.start();
+ if (success) {
+ // update pipeline at the namenode
+ Block newBlock = new Block(
+ block.getBlockId(), block.getNumBytes(), newGS);
+ namenode.updatePipeline(clientName, block, newBlock, nodes);
+ // update client side generation stamp
+ block = newBlock;
}
return false; // do not sleep, continue processing
}
@@ -2781,24 +3113,31 @@
do {
hasError = false;
lastException = null;
- errorIndex = 0;
+ errorIndex = -1;
retry = false;
success = false;
long startTime = System.currentTimeMillis();
- lb = locateFollowingBlock(startTime);
+ DatanodeInfo[] w = excludedNodes.toArray(
+ new DatanodeInfo[excludedNodes.size()]);
+ lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
block = lb.getBlock();
+ block.setNumBytes(0);
accessToken = lb.getAccessToken();
nodes = lb.getLocations();
//
// Connect to first DataNode in the list.
//
- success = createBlockOutputStream(nodes, clientName, false);
+ success = createBlockOutputStream(nodes, 0L, false);
if (!success) {
LOG.info("Abandoning block " + block);
namenode.abandonBlock(block, src, clientName);
+ block = null;
+
+ LOG.info("Excluding datanode " + nodes[errorIndex]);
+ excludedNodes.add(nodes[errorIndex]);
// Connection failed. Let's wait a little bit and retry
retry = true;
@@ -2806,6 +3145,7 @@
if (System.currentTimeMillis() - startTime > 5000) {
LOG.info("Waiting to find target node: " + nodes[0].getName());
}
+ //TODO fix this timout. Extract it o a constant, maybe make it available from conf
Thread.sleep(6000);
} catch (InterruptedException iex) {
}
@@ -2821,7 +3161,7 @@
// connects to the first datanode in the pipeline
// Returns true if success, otherwise return failure.
//
- private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
+ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
boolean recoveryFlag) {
DataTransferProtocol.Status pipelineStatus = SUCCESS;
String firstBadLink = "";
@@ -2856,9 +3196,11 @@
DataNode.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
+ // send the request
DataTransferProtocol.Sender.opWriteBlock(out,
- block.getBlockId(), block.getGenerationStamp(), nodes.length,
- recoveryFlag, client, null, nodes, accessToken);
+ block.getBlockId(), block.getGenerationStamp(),
+ nodes.length, recoveryFlag?stage.getRecoveryStage():stage, newGS,
+ block.getNumBytes(), bytesSent, clientName, null, nodes, accessToken);
checksum.writeHeader(out);
out.flush();
@@ -2891,6 +3233,8 @@
break;
}
}
+ } else {
+ errorIndex = 0;
}
hasError = true;
setLastException(ie);
@@ -2899,14 +3243,15 @@
}
}
- private LocatedBlock locateFollowingBlock(long start) throws IOException {
+ private LocatedBlock locateFollowingBlock(long start,
+ DatanodeInfo[] excludedNodes) throws IOException {
int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
while (true) {
try {
- return namenode.addBlock(src, clientName);
+ return namenode.addBlock(src, clientName, block, excludedNodes);
} catch (RemoteException e) {
IOException ue =
e.unwrapRemoteException(FileNotFoundException.class,
@@ -2947,59 +3292,15 @@
}
}
- void initAppend(LocatedBlock lastBlock, FileStatus stat,
- int bytesPerChecksum) throws IOException {
- block = lastBlock.getBlock();
- accessToken = lastBlock.getAccessToken();
- long usedInLastBlock = stat.getLen() % blockSize;
- int freeInLastBlock = (int)(blockSize - usedInLastBlock);
-
- // calculate the amount of free space in the pre-existing
- // last crc chunk
- int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
- int freeInCksum = bytesPerChecksum - usedInCksum;
-
- // if there is space in the last block, then we have to
- // append to that block
- if (freeInLastBlock > blockSize) {
- throw new IOException("The last block for file " +
- src + " is full.");
- }
-
- if (usedInCksum > 0 && freeInCksum > 0) {
- // if there is space in the last partial chunk, then
- // setup in such a way that the next packet will have only
- // one chunk that fills up the partial chunk.
- //
- computePacketChunkSize(0, freeInCksum);
- resetChecksumChunk(freeInCksum);
- appendChunk = true;
- } else {
- // if the remaining space in the block is smaller than
- // that expected size of of a packet, then create
- // smaller size packet.
- //
- computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock),
- bytesPerChecksum);
- }
-
- // setup pipeline to append to the last block XXX retries??
- nodes = lastBlock.getLocations();
- errorIndex = -1; // no errors yet.
- if (nodes.length < 1) {
- throw new IOException("Unable to retrieve blocks locations " +
- " for last block " + block +
- "of file " + src);
-
- }
- processDatanodeError(true, true);
+ Block getBlock() {
+ return block;
}
DatanodeInfo[] getNodes() {
return nodes;
}
- AccessToken getAccessToken() {
+ BlockAccessToken getAccessToken() {
return accessToken;
}
@@ -3058,10 +3359,10 @@
/**
* Create a new output stream to the given DataNode.
- * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
+ * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long)
*/
DFSOutputStream(String src, FsPermission masked, EnumSet<CreateFlag> flag,
- short replication, long blockSize, Progressable progress,
+ boolean createParent, short replication, long blockSize, Progressable progress,
int buffersize, int bytesPerChecksum) throws IOException {
this(src, blockSize, progress, bytesPerChecksum);
@@ -3069,12 +3370,15 @@
try {
namenode.create(
- src, masked, clientName, new EnumSetWritable<CreateFlag>(flag), replication, blockSize);
+ src, masked, clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
+ FileAlreadyExistsException.class,
+ FileNotFoundException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class);
}
+ streamer = new DataStreamer();
streamer.start();
}
@@ -3094,9 +3398,10 @@
if (lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
- streamer.initAppend(lastBlock, stat, bytesPerChecksum);
+ streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
} else {
computePacketChunkSize(writePacketSize, bytesPerChecksum);
+ streamer = new DataStreamer();
}
streamer.start();
}
@@ -3185,36 +3490,56 @@
", blockSize=" + blockSize +
", appendChunk=" + appendChunk);
}
- //
- // if we allocated a new packet because we encountered a block
- // boundary, reset bytesCurBlock.
- //
- if (bytesCurBlock == blockSize) {
- currentPacket.lastPacketInBlock = true;
- bytesCurBlock = 0;
- lastFlushOffset = -1;
- }
waitAndQueuePacket(currentPacket);
currentPacket = null;
- // If this was the first write after reopening a file, then the above
- // write filled up any partial chunk. Tell the summer to generate full
+ // If the reopened file did not end at chunk boundary and the above
+ // write filled up its partial chunk. Tell the summer to generate full
// crc chunks from now on.
- if (appendChunk) {
+ if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
appendChunk = false;
resetChecksumChunk(bytesPerChecksum);
}
- int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
- computePacketChunkSize(psize, bytesPerChecksum);
+
+ if (!appendChunk) {
+ int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
+ computePacketChunkSize(psize, bytesPerChecksum);
+ }
+ //
+ // if encountering a block boundary, send an empty packet to
+ // indicate the end of block and reset bytesCurBlock.
+ //
+ if (bytesCurBlock == blockSize) {
+ currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
+ bytesCurBlock);
+ currentPacket.lastPacketInBlock = true;
+ waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ bytesCurBlock = 0;
+ lastFlushOffset = -1;
+ }
}
}
+ @Override
+ @Deprecated
+ public synchronized void sync() throws IOException {
+ hflush();
+ }
+
/**
- * All data is written out to datanodes. It is not guaranteed
- * that data has been flushed to persistent store on the
- * datanode. Block allocations are persisted on namenode.
+ * flushes out to all replicas of the block.
+ * The data is in the buffers of the DNs
+ * but not neccessary on the DN's OS buffers.
+ *
+ * It is a synchronous operation. When it returns,
+ * it gurantees that flushed data become visible to new readers.
+ * It is not guaranteed that data has been flushed to
+ * persistent store on the datanode.
+ * Block allocations are persisted on namenode.
*/
- public synchronized void sync() throws IOException {
+ @Override
+ public synchronized void hflush() throws IOException {
checkOpen();
isClosed();
try {
@@ -3258,12 +3583,24 @@
}
} catch (IOException e) {
lastException = new IOException("IOException flush:" + e);
- closeThreads();
+ closeThreads(true);
throw e;
}
}
/**
+ * The expected semantics is all data have flushed out to all replicas
+ * and all replicas have done posix fsync equivalent - ie the OS has
+ * flushed it to the disk device (but the disk may have it in its cache).
+ *
+ * Right now by default it is implemented as hflush
+ */
+ @Override
+ public synchronized void hsync() throws IOException {
+ hflush();
+ }
+
+ /**
* Waits till all existing data is flushed and confirmations
* received from datanodes.
*/
@@ -3299,13 +3636,14 @@
}
streamer.setLastException(new IOException("Lease timeout of " +
(hdfsTimeout/1000) + " seconds expired."));
- closeThreads();
+ closeThreads(true);
}
// shutdown datastreamer and responseprocessor threads.
- private void closeThreads() throws IOException {
+ // interrupt datastreamer if force is true
+ private void closeThreads(boolean force) throws IOException {
try {
- streamer.close();
+ streamer.close(force);
streamer.join();
if (s != null) {
s.close();
@@ -3336,21 +3674,22 @@
try {
flushBuffer(); // flush from all upper layers
- // Mark that this packet is the last packet in block.
- // If there are no outstanding packets and the last packet
- // was not the last one in the current block, then create a
- // packet with empty payload.
- if (currentPacket == null && bytesCurBlock != 0) {
- currentPacket = new Packet(packetSize, chunksPerPacket,
- bytesCurBlock);
- }
if (currentPacket != null) {
+ waitAndQueuePacket(currentPacket);
+ }
+
+ if (bytesCurBlock != 0) {
+ // send an empty packet to mark the end of the block
+ currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
+ bytesCurBlock);
currentPacket.lastPacketInBlock = true;
}
flushInternal(); // flush all data to Datanodes
- closeThreads();
- completeFile();
+ // get last block before destroying the streamer
+ Block lastBlock = streamer.getBlock();
+ closeThreads(false);
+ completeFile(lastBlock);
leasechecker.remove(src);
} finally {
closed = true;
@@ -3359,11 +3698,11 @@
// should be called holding (this) lock since setTestFilename() may
// be called during unit tests
- private void completeFile() throws IOException {
+ private void completeFile(Block last) throws IOException {
long localstart = System.currentTimeMillis();
boolean fileComplete = false;
while (!fileComplete) {
- fileComplete = namenode.complete(src, clientName);
+ fileComplete = namenode.complete(src, clientName, last);
if (!fileComplete) {
if (!clientRunning ||
(hdfsTimeout > 0 &&
@@ -3411,7 +3750,7 @@
/**
* Returns the access token currently used by streamer, for testing only
*/
- AccessToken getAccessToken() {
+ BlockAccessToken getAccessToken() {
return streamer.getAccessToken();
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Sat Nov 28 20:05:56 2009
@@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.fs.Options;
/****************************************************************
@@ -177,12 +178,20 @@
}
return dfs.getBlockLocations(getPathName(file.getPath()), start, len);
}
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(Path p,
+ long start, long len) throws IOException {
+ return dfs.getBlockLocations(getPathName(p), start, len);
+
+ }
@Override
public void setVerifyChecksum(boolean verifyChecksum) {
this.verifyChecksum = verifyChecksum;
}
+ @SuppressWarnings("deprecation")
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return new DFSClient.DFSDataInputStream(
@@ -203,11 +212,33 @@
EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
- return new FSDataOutputStream
- (dfs.create(getPathName(f), permission,
+ return new FSDataOutputStream(dfs.create(getPathName(f), permission,
flag, replication, blockSize, progress, bufferSize),
statistics);
}
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected FSDataOutputStream primitiveCreate(Path f,
+ FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
+ short replication, long blockSize, Progressable progress,
+ int bytesPerChecksum) throws IOException {
+ return new FSDataOutputStream(dfs.primitiveCreate(getPathName(f),
+ absolutePermission, flag, true, replication, blockSize,
+ progress, bufferSize, bytesPerChecksum),statistics);
+ }
+
+ /**
+ * Same as create(), except fails if parent directory doesn't already exist.
+ * @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable)
+ */
+ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+ EnumSet<CreateFlag> flag, int bufferSize, short replication,
+ long blockSize, Progressable progress) throws IOException {
+
+ return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
+ false, replication, blockSize, progress, bufferSize), statistics);
+ }
@Override
public boolean setReplication(Path src,
@@ -215,19 +246,41 @@
) throws IOException {
return dfs.setReplication(getPathName(src), replication);
}
-
+
/**
- * Rename files/dirs
+ * THIS IS DFS only operations, it is not part of FileSystem
+ * move blocks from srcs to trg
+ * and delete srcs afterwards
+ * all blocks should be the same size
+ * @param trg existing file to append to
+ * @param psrcs list of files (same block size, same replication)
+ * @throws IOException
*/
+ public void concat(Path trg, Path [] psrcs) throws IOException {
+ String [] srcs = new String [psrcs.length];
+ for(int i=0; i<psrcs.length; i++) {
+ srcs[i] = getPathName(psrcs[i]);
+ }
+ dfs.concat(getPathName(trg), srcs);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
@Override
public boolean rename(Path src, Path dst) throws IOException {
return dfs.rename(getPathName(src), getPathName(dst));
}
- /**
- * requires a boolean check to delete a non
- * empty directory recursively.
+ /**
+ * {@inheritDoc}
+ * This rename operation is guaranteed to be atomic.
*/
+ @SuppressWarnings("deprecation")
+ @Override
+ public void rename(Path src, Path dst, Options.Rename... options) throws IOException {
+ dfs.rename(getPathName(src), getPathName(dst), options);
+ }
+
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return dfs.delete(getPathName(f), recursive);
@@ -268,9 +321,24 @@
return stats;
}
+ /**
+ * Create a directory with given name and permission, only when
+ * parent directory exists.
+ */
+ public boolean mkdir(Path f, FsPermission permission) throws IOException {
+ return dfs.mkdirs(getPathName(f), permission, false);
+ }
+
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
- return dfs.mkdirs(getPathName(f), permission);
+ return dfs.mkdirs(getPathName(f), permission, true);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
+ throws IOException {
+ return dfs.primitiveMkdir(getPathName(f), absolutePermission);
}
/** {@inheritDoc} */
@@ -434,6 +502,12 @@
dfs.metaSave(pathname);
}
+ /** {@inheritDoc} */
+ @Override
+ public FsServerDefaults getServerDefaults() throws IOException {
+ return dfs.getServerDefaults();
+ }
+
/**
* We need to find the blocks that didn't match. Likely only one
* is corrupt but we will report both to the namenode. In the future,
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java Sat Nov 28 20:05:56 2009
@@ -27,9 +27,11 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Random;
+import java.util.TimeZone;
import javax.security.auth.login.LoginException;
@@ -37,15 +39,12 @@
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.server.common.ThreadLocalDateFormat;
-import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -59,6 +58,8 @@
import org.xml.sax.helpers.DefaultHandler;
import org.xml.sax.helpers.XMLReaderFactory;
+
+
/** An implementation of a protocol for accessing filesystems over HTTP.
* The following implementation provides a limited, read-only interface
* to a filesystem over HTTP.
@@ -74,7 +75,21 @@
protected UserGroupInformation ugi;
protected final Random ran = new Random();
- protected static final ThreadLocalDateFormat df = ListPathsServlet.df;
+ public static final String HFTP_TIMEZONE = "UTC";
+ public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
+
+ public static final SimpleDateFormat getDateFormat() {
+ final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
+ df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
+ return df;
+ }
+
+ protected static final ThreadLocal<SimpleDateFormat> df =
+ new ThreadLocal<SimpleDateFormat>() {
+ protected SimpleDateFormat initialValue() {
+ return getDateFormat();
+ }
+ };
@Override
public void initialize(URI name, Configuration conf) throws IOException {
@@ -100,55 +115,48 @@
}
}
- /**
- * Open an HTTP connection to the namenode to read file data and metadata.
- * @param path The path component of the URL
- * @param query The query component of the URL
- */
- protected HttpURLConnection openConnection(String path, String query)
- throws IOException {
+
+ /*
+ Construct URL pointing to file on namenode
+ */
+ URL getNamenodeFileURL(Path f) throws IOException {
+ return getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + ugi);
+ }
+
+ /*
+ Construct URL pointing to namenode.
+ */
+ URL getNamenodeURL(String path, String query) throws IOException {
try {
final URL url = new URI("http", null, nnAddr.getHostName(),
nnAddr.getPort(), path, query, null).toURL();
if (LOG.isTraceEnabled()) {
LOG.trace("url=" + url);
}
- HttpURLConnection connection = (HttpURLConnection)url.openConnection();
- connection.setRequestMethod("GET");
- connection.connect();
- return connection;
+ return url;
} catch (URISyntaxException e) {
- throw (IOException)new IOException().initCause(e);
+ throw new IOException(e);
}
}
+ /**
+ * Open an HTTP connection to the namenode to read file data and metadata.
+ * @param path The path component of the URL
+ * @param query The query component of the URL
+ */
+ protected HttpURLConnection openConnection(String path, String query)
+ throws IOException {
+ final URL url = getNamenodeURL(path, query);
+ HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+ connection.setRequestMethod("GET");
+ connection.connect();
+ return connection;
+ }
+
@Override
public FSDataInputStream open(Path f, int buffersize) throws IOException {
- HttpURLConnection connection = null;
- connection = openConnection("/data" + f.toUri().getPath(), "ugi=" + ugi);
- final InputStream in = connection.getInputStream();
- return new FSDataInputStream(new FSInputStream() {
- public int read() throws IOException {
- return in.read();
- }
- public int read(byte[] b, int off, int len) throws IOException {
- return in.read(b, off, len);
- }
-
- public void close() throws IOException {
- in.close();
- }
-
- public void seek(long pos) throws IOException {
- throw new IOException("Can't seek!");
- }
- public long getPos() throws IOException {
- throw new IOException("Position unknown!");
- }
- public boolean seekToNewSource(long targetPos) throws IOException {
- return false;
- }
- });
+ URL u = getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + ugi);
+ return new FSDataInputStream(new ByteRangeInputStream(u));
}
/** Class to parse and store a listing reply from the server. */
@@ -168,10 +176,11 @@
long modif;
long atime = 0;
try {
- modif = df.parse(attrs.getValue("modified")).getTime();
+ final SimpleDateFormat ldf = df.get();
+ modif = ldf.parse(attrs.getValue("modified")).getTime();
String astr = attrs.getValue("accesstime");
if (astr != null) {
- atime = df.parse(astr).getTime();
+ atime = ldf.parse(astr).getTime();
}
} catch (ParseException e) { throw new SAXException(e); }
FileStatus fs = "file".equals(qname)
@@ -286,7 +295,7 @@
@Override
public Path getWorkingDirectory() {
- return new Path("/").makeQualified(this);
+ return new Path("/").makeQualified(getUri(), null);
}
@Override
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HsftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HsftpFileSystem.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HsftpFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HsftpFileSystem.java Sat Nov 28 20:05:56 2009
@@ -65,9 +65,9 @@
* @throws IOException
*/
private static void setupSsl(Configuration conf) throws IOException {
- Configuration sslConf = new Configuration(false);
- sslConf.addResource(conf.get("dfs.https.client.keystore.resource",
- "ssl-client.xml"));
+ Configuration sslConf = new HdfsConfiguration(false);
+ sslConf.addResource(conf.get(DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
+ DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
FileInputStream fis = null;
try {
SSLContext sc = SSLContext.getInstance("SSL");
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/Block.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/Block.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/Block.java Sat Nov 28 20:05:56 2009
@@ -40,10 +40,6 @@
});
}
- // generation stamp of blocks that pre-date the introduction of
- // a generation stamp.
- public static final long GRANDFATHER_GENERATION_STAMP = 0;
-
public static final Pattern blockFilePattern = Pattern
.compile(BLOCK_FILE_PREFIX + "(-??\\d++)$");
public static final Pattern metaFilePattern = Pattern
@@ -70,7 +66,7 @@
public static long getGenerationStamp(String metaFile) {
Matcher m = metaFilePattern.matcher(metaFile);
return m.matches() ? Long.parseLong(m.group(2))
- : GRANDFATHER_GENERATION_STAMP;
+ : GenerationStamp.GRANDFATHER_GENERATION_STAMP;
}
/**
@@ -91,9 +87,13 @@
set(blkid, len, generationStamp);
}
- public Block(final long blkid) {this(blkid, 0, GenerationStamp.WILDCARD_STAMP);}
+ public Block(final long blkid) {
+ this(blkid, 0, GenerationStamp.GRANDFATHER_GENERATION_STAMP);
+ }
- public Block(Block blk) {this(blk.blockId, blk.numBytes, blk.generationStamp);}
+ public Block(Block blk) {
+ this(blk.blockId, blk.numBytes, blk.generationStamp);
+ }
/**
* Find the blockid from the given filename
@@ -164,32 +164,13 @@
}
}
- /////////////////////////////////////
- // Comparable
- /////////////////////////////////////
- static void validateGenerationStamp(long generationstamp) {
- if (generationstamp == GenerationStamp.WILDCARD_STAMP) {
- throw new IllegalStateException("generationStamp (=" + generationstamp
- + ") == GenerationStamp.WILDCARD_STAMP");
- }
- }
-
- /** {@inheritDoc} */
+ @Override // Comparable
public int compareTo(Block b) {
- //Wildcard generationStamp is NOT ALLOWED here
- validateGenerationStamp(this.generationStamp);
- validateGenerationStamp(b.generationStamp);
-
- if (blockId < b.blockId) {
- return -1;
- } else if (blockId == b.blockId) {
- return GenerationStamp.compare(generationStamp, b.generationStamp);
- } else {
- return 1;
- }
+ return blockId < b.blockId ? -1 :
+ blockId > b.blockId ? 1 : 0;
}
- /** {@inheritDoc} */
+ @Override // Object
public boolean equals(Object o) {
if (this == o) {
return true;
@@ -197,16 +178,12 @@
if (!(o instanceof Block)) {
return false;
}
- final Block that = (Block)o;
- //Wildcard generationStamp is ALLOWED here
- return this.blockId == that.blockId
- && GenerationStamp.equalsWithWildcard(
- this.generationStamp, that.generationStamp);
+ return compareTo((Block)o) == 0;
}
- /** {@inheritDoc} */
+ @Override // Object
public int hashCode() {
//GenerationStamp is IRRELEVANT and should not be used here
- return 37 * 17 + (int) (blockId^(blockId>>>32));
+ return (int)(blockId^(blockId>>>32));
}
}