You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC
svn commit: r529410 [4/27] - in /lucene/hadoop/trunk: ./
src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/
src/contrib/abacus/src/java/org/apache/hadoop/abacus/
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/c...
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Apr 16 14:44:35 2007
@@ -42,1400 +42,1400 @@
* @author Mike Cafarella, Tessa MacDuff
********************************************************/
class DFSClient implements FSConstants {
- public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.DFSClient");
- static int MAX_BLOCK_ACQUIRE_FAILURES = 3;
- private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
- private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
- ClientProtocol namenode;
- boolean running = true;
- Random r = new Random();
- String clientName;
- Daemon leaseChecker;
- private Configuration conf;
- private long defaultBlockSize;
- private short defaultReplication;
+ public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.DFSClient");
+ static int MAX_BLOCK_ACQUIRE_FAILURES = 3;
+ private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
+ private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
+ ClientProtocol namenode;
+ boolean running = true;
+ Random r = new Random();
+ String clientName;
+ Daemon leaseChecker;
+ private Configuration conf;
+ private long defaultBlockSize;
+ private short defaultReplication;
- /**
- * A map from name -> DFSOutputStream of files that are currently being
- * written by this client.
- */
- private TreeMap pendingCreates = new TreeMap();
+ /**
+ * A map from name -> DFSOutputStream of files that are currently being
+ * written by this client.
+ */
+ private TreeMap pendingCreates = new TreeMap();
- /**
- * A class to track the list of DFS clients, so that they can be closed
- * on exit.
- * @author Owen O'Malley
- */
- private static class ClientFinalizer extends Thread {
- private List clients = new ArrayList();
-
- public synchronized void addClient(DFSClient client) {
- clients.add(client);
- }
-
- public synchronized void run() {
- Iterator itr = clients.iterator();
- while (itr.hasNext()) {
- DFSClient client = (DFSClient) itr.next();
- if (client.running) {
- try {
- client.close();
- } catch (IOException ie) {
- System.err.println("Error closing client");
- ie.printStackTrace();
- }
+ /**
+ * A class to track the list of DFS clients, so that they can be closed
+ * on exit.
+ * @author Owen O'Malley
+ */
+ private static class ClientFinalizer extends Thread {
+ private List clients = new ArrayList();
+
+ public synchronized void addClient(DFSClient client) {
+ clients.add(client);
+ }
+
+ public synchronized void run() {
+ Iterator itr = clients.iterator();
+ while (itr.hasNext()) {
+ DFSClient client = (DFSClient) itr.next();
+ if (client.running) {
+ try {
+ client.close();
+ } catch (IOException ie) {
+ System.err.println("Error closing client");
+ ie.printStackTrace();
}
}
}
}
+ }
- // add a cleanup thread
- private static ClientFinalizer clientFinalizer = new ClientFinalizer();
- static {
- Runtime.getRuntime().addShutdownHook(clientFinalizer);
- }
+ // add a cleanup thread
+ private static ClientFinalizer clientFinalizer = new ClientFinalizer();
+ static {
+ Runtime.getRuntime().addShutdownHook(clientFinalizer);
+ }
- /**
- * Create a new DFSClient connected to the given namenode server.
- */
- public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
+ /**
+ * Create a new DFSClient connected to the given namenode server.
+ */
+ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
throws IOException {
- this.conf = conf;
- this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
- ClientProtocol.versionID, nameNodeAddr, conf);
- String taskId = conf.get("mapred.task.id");
- if (taskId != null) {
- this.clientName = "DFSClient_" + taskId;
- } else {
- this.clientName = "DFSClient_" + r.nextInt();
- }
- defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
- defaultReplication = (short) conf.getInt("dfs.replication", 3);
- this.leaseChecker = new Daemon(new LeaseChecker());
- this.leaseChecker.start();
- }
-
- private void checkOpen() throws IOException {
- if (!running) {
- IOException result = new IOException("Filesystem closed");
- throw result;
- }
+ this.conf = conf;
+ this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
+ ClientProtocol.versionID, nameNodeAddr, conf);
+ String taskId = conf.get("mapred.task.id");
+ if (taskId != null) {
+ this.clientName = "DFSClient_" + taskId;
+ } else {
+ this.clientName = "DFSClient_" + r.nextInt();
+ }
+ defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ defaultReplication = (short) conf.getInt("dfs.replication", 3);
+ this.leaseChecker = new Daemon(new LeaseChecker());
+ this.leaseChecker.start();
+ }
+
+ private void checkOpen() throws IOException {
+ if (!running) {
+ IOException result = new IOException("Filesystem closed");
+ throw result;
}
+ }
- /**
- * Close the file system, abadoning all of the leases and files being
- * created.
- */
- public void close() throws IOException {
- // synchronize in here so that we don't need to change the API
- synchronized (this) {
- checkOpen();
- synchronized (pendingCreates) {
- Iterator file_itr = pendingCreates.keySet().iterator();
- while (file_itr.hasNext()) {
- String name = (String) file_itr.next();
- try {
- namenode.abandonFileInProgress(name, clientName);
- } catch (IOException ie) {
- System.err.println("Exception abandoning create lock on " + name);
- ie.printStackTrace();
- }
+ /**
+ * Close the file system, abadoning all of the leases and files being
+ * created.
+ */
+ public void close() throws IOException {
+ // synchronize in here so that we don't need to change the API
+ synchronized (this) {
+ checkOpen();
+ synchronized (pendingCreates) {
+ Iterator file_itr = pendingCreates.keySet().iterator();
+ while (file_itr.hasNext()) {
+ String name = (String) file_itr.next();
+ try {
+ namenode.abandonFileInProgress(name, clientName);
+ } catch (IOException ie) {
+ System.err.println("Exception abandoning create lock on " + name);
+ ie.printStackTrace();
}
- pendingCreates.clear();
- }
- this.running = false;
- try {
- leaseChecker.join();
- } catch (InterruptedException ie) {
}
+ pendingCreates.clear();
+ }
+ this.running = false;
+ try {
+ leaseChecker.join();
+ } catch (InterruptedException ie) {
}
}
+ }
- /**
- * Get the default block size for this cluster
- * @return the default block size in bytes
- */
- public long getDefaultBlockSize() {
- return defaultBlockSize;
- }
+ /**
+ * Get the default block size for this cluster
+ * @return the default block size in bytes
+ */
+ public long getDefaultBlockSize() {
+ return defaultBlockSize;
+ }
- public long getBlockSize(UTF8 f) throws IOException {
- int retries = 4;
- while (true) {
- try {
- return namenode.getBlockSize(f.toString());
- } catch (IOException ie) {
- if (--retries == 0) {
- LOG.warn("Problem getting block size: " +
- StringUtils.stringifyException(ie));
- throw ie;
- }
- LOG.debug("Problem getting block size: " +
- StringUtils.stringifyException(ie));
+ public long getBlockSize(UTF8 f) throws IOException {
+ int retries = 4;
+ while (true) {
+ try {
+ return namenode.getBlockSize(f.toString());
+ } catch (IOException ie) {
+ if (--retries == 0) {
+ LOG.warn("Problem getting block size: " +
+ StringUtils.stringifyException(ie));
+ throw ie;
}
+ LOG.debug("Problem getting block size: " +
+ StringUtils.stringifyException(ie));
}
}
+ }
- /**
- * Report corrupt blocks that were discovered by the client.
- */
- public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
- namenode.reportBadBlocks(blocks);
- }
+ /**
+ * Report corrupt blocks that were discovered by the client.
+ */
+ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+ namenode.reportBadBlocks(blocks);
+ }
- public short getDefaultReplication() {
- return defaultReplication;
- }
+ public short getDefaultReplication() {
+ return defaultReplication;
+ }
- /**
- * Get hints about the location of the indicated block(s). The
- * array returned is as long as there are blocks in the indicated
- * range. Each block may have one or more locations.
- */
- public String[][] getHints(UTF8 src, long start, long len) throws IOException {
- return namenode.getHints(src.toString(), start, len);
- }
-
- /**
- * Create an input stream that obtains a nodelist from the
- * namenode, and then reads from all the right places. Creates
- * inner subclass of InputStream that does the right out-of-band
- * work.
- */
- public DFSInputStream open(UTF8 src) throws IOException {
- checkOpen();
- // Get block info from namenode
- return new DFSInputStream(src.toString());
- }
-
- /**
- * Create a new dfs file and return an output stream for writing into it.
- *
- * @param src stream name
- * @param overwrite do not check for file existence if true
- * @return output stream
- * @throws IOException
- */
- public OutputStream create( UTF8 src,
- boolean overwrite
- ) throws IOException {
- return create( src, overwrite, defaultReplication, defaultBlockSize, null);
- }
+ /**
+ * Get hints about the location of the indicated block(s). The
+ * array returned is as long as there are blocks in the indicated
+ * range. Each block may have one or more locations.
+ */
+ public String[][] getHints(UTF8 src, long start, long len) throws IOException {
+ return namenode.getHints(src.toString(), start, len);
+ }
+
+ /**
+ * Create an input stream that obtains a nodelist from the
+ * namenode, and then reads from all the right places. Creates
+ * inner subclass of InputStream that does the right out-of-band
+ * work.
+ */
+ public DFSInputStream open(UTF8 src) throws IOException {
+ checkOpen();
+ // Get block info from namenode
+ return new DFSInputStream(src.toString());
+ }
+
+ /**
+ * Create a new dfs file and return an output stream for writing into it.
+ *
+ * @param src stream name
+ * @param overwrite do not check for file existence if true
+ * @return output stream
+ * @throws IOException
+ */
+ public OutputStream create( UTF8 src,
+ boolean overwrite
+ ) throws IOException {
+ return create( src, overwrite, defaultReplication, defaultBlockSize, null);
+ }
- /**
- * Create a new dfs file and return an output stream for writing into it
- * with write-progress reporting.
- *
- * @param src stream name
- * @param overwrite do not check for file existence if true
- * @return output stream
- * @throws IOException
- */
- public OutputStream create( UTF8 src,
- boolean overwrite,
- Progressable progress
- ) throws IOException {
- return create( src, overwrite, defaultReplication, defaultBlockSize, null);
- }
+ /**
+ * Create a new dfs file and return an output stream for writing into it
+ * with write-progress reporting.
+ *
+ * @param src stream name
+ * @param overwrite do not check for file existence if true
+ * @return output stream
+ * @throws IOException
+ */
+ public OutputStream create( UTF8 src,
+ boolean overwrite,
+ Progressable progress
+ ) throws IOException {
+ return create( src, overwrite, defaultReplication, defaultBlockSize, null);
+ }
- /**
- * Create a new dfs file with the specified block replication
- * and return an output stream for writing into the file.
- *
- * @param src stream name
- * @param overwrite do not check for file existence if true
- * @param replication block replication
- * @return output stream
- * @throws IOException
- */
- public OutputStream create( UTF8 src,
- boolean overwrite,
- short replication,
- long blockSize
+ /**
+ * Create a new dfs file with the specified block replication
+ * and return an output stream for writing into the file.
+ *
+ * @param src stream name
+ * @param overwrite do not check for file existence if true
+ * @param replication block replication
+ * @return output stream
+ * @throws IOException
+ */
+ public OutputStream create( UTF8 src,
+ boolean overwrite,
+ short replication,
+ long blockSize
+ ) throws IOException {
+ return create(src, overwrite, replication, blockSize, null);
+ }
+
+ /**
+ * Create a new dfs file with the specified block replication
+ * with write-progress reporting and return an output stream for writing
+ * into the file.
+ *
+ * @param src stream name
+ * @param overwrite do not check for file existence if true
+ * @param replication block replication
+ * @return output stream
+ * @throws IOException
+ */
+ public OutputStream create( UTF8 src,
+ boolean overwrite,
+ short replication,
+ long blockSize,
+ Progressable progress
+ ) throws IOException {
+ checkOpen();
+ OutputStream result = new DFSOutputStream(src, overwrite,
+ replication, blockSize, progress);
+ synchronized (pendingCreates) {
+ pendingCreates.put(src.toString(), result);
+ }
+ return result;
+ }
+ /**
+ * Set replication for an existing file.
+ *
+ * @see ClientProtocol#setReplication(String, short)
+ * @param replication
+ * @throws IOException
+ * @return true is successful or false if file does not exist
+ * @author shv
+ */
+ public boolean setReplication(UTF8 src,
+ short replication
) throws IOException {
- return create(src, overwrite, replication, blockSize, null);
- }
+ return namenode.setReplication(src.toString(), replication);
+ }
- /**
- * Create a new dfs file with the specified block replication
- * with write-progress reporting and return an output stream for writing
- * into the file.
- *
- * @param src stream name
- * @param overwrite do not check for file existence if true
- * @param replication block replication
- * @return output stream
- * @throws IOException
- */
- public OutputStream create( UTF8 src,
- boolean overwrite,
- short replication,
- long blockSize,
- Progressable progress
- ) throws IOException {
- checkOpen();
- OutputStream result = new DFSOutputStream(src, overwrite,
- replication, blockSize, progress);
- synchronized (pendingCreates) {
- pendingCreates.put(src.toString(), result);
+ /**
+ * Make a direct connection to namenode and manipulate structures
+ * there.
+ */
+ public boolean rename(UTF8 src, UTF8 dst) throws IOException {
+ checkOpen();
+ return namenode.rename(src.toString(), dst.toString());
+ }
+
+ /**
+ * Make a direct connection to namenode and manipulate structures
+ * there.
+ */
+ public boolean delete(UTF8 src) throws IOException {
+ checkOpen();
+ return namenode.delete(src.toString());
+ }
+
+ /**
+ */
+ public boolean exists(UTF8 src) throws IOException {
+ checkOpen();
+ return namenode.exists(src.toString());
+ }
+
+ /**
+ */
+ public boolean isDirectory(UTF8 src) throws IOException {
+ checkOpen();
+ return namenode.isDir(src.toString());
+ }
+
+ /**
+ */
+ public DFSFileInfo[] listPaths(UTF8 src) throws IOException {
+ checkOpen();
+ return namenode.getListing(src.toString());
+ }
+
+ /**
+ */
+ public long totalRawCapacity() throws IOException {
+ long rawNums[] = namenode.getStats();
+ return rawNums[0];
+ }
+
+ /**
+ */
+ public long totalRawUsed() throws IOException {
+ long rawNums[] = namenode.getStats();
+ return rawNums[1];
+ }
+
+ public DatanodeInfo[] datanodeReport() throws IOException {
+ return namenode.getDatanodeReport();
+ }
+
+ /**
+ * Enter, leave or get safe mode.
+ * See {@link ClientProtocol#setSafeMode(FSConstants.SafeModeAction)}
+ * for more details.
+ *
+ * @see ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
+ */
+ public boolean setSafeMode( SafeModeAction action ) throws IOException {
+ return namenode.setSafeMode( action );
+ }
+
+ /**
+ * Refresh the hosts and exclude files. (Rereads them.)
+ * See {@link ClientProtocol#refreshNodes()}
+ * for more details.
+ *
+ * @see ClientProtocol#refreshNodes()
+ */
+ public void refreshNodes() throws IOException {
+ namenode.refreshNodes();
+ }
+
+ /**
+ * Dumps DFS data structures into specified file.
+ * See {@link ClientProtocol#metaSave()}
+ * for more details.
+ *
+ * @see ClientProtocol#metaSave()
+ */
+ public void metaSave(String pathname) throws IOException {
+ namenode.metaSave(pathname);
+ }
+
+ /**
+ * @see ClientProtocol#finalizeUpgrade()
+ */
+ public void finalizeUpgrade() throws IOException {
+ namenode.finalizeUpgrade();
+ }
+
+ /**
+ */
+ public boolean mkdirs(UTF8 src) throws IOException {
+ checkOpen();
+ return namenode.mkdirs(src.toString());
+ }
+
+ /**
+ */
+ public void lock(UTF8 src, boolean exclusive) throws IOException {
+ long start = System.currentTimeMillis();
+ boolean hasLock = false;
+ while (! hasLock) {
+ hasLock = namenode.obtainLock(src.toString(), clientName, exclusive);
+ if (! hasLock) {
+ try {
+ Thread.sleep(400);
+ if (System.currentTimeMillis() - start > 5000) {
+ LOG.info("Waiting to retry lock for " + (System.currentTimeMillis() - start) + " ms.");
+ Thread.sleep(2000);
+ }
+ } catch (InterruptedException ie) {
+ }
}
- return result;
- }
- /**
- * Set replication for an existing file.
- *
- * @see ClientProtocol#setReplication(String, short)
- * @param replication
- * @throws IOException
- * @return true is successful or false if file does not exist
- * @author shv
- */
- public boolean setReplication(UTF8 src,
- short replication
- ) throws IOException {
- return namenode.setReplication(src.toString(), replication);
}
+ }
- /**
- * Make a direct connection to namenode and manipulate structures
- * there.
- */
- public boolean rename(UTF8 src, UTF8 dst) throws IOException {
- checkOpen();
- return namenode.rename(src.toString(), dst.toString());
+ /**
+ *
+ */
+ public void release(UTF8 src) throws IOException {
+ boolean hasReleased = false;
+ while (! hasReleased) {
+ hasReleased = namenode.releaseLock(src.toString(), clientName);
+ if (! hasReleased) {
+ LOG.info("Could not release. Retrying...");
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ie) {
+ }
+ }
}
+ }
- /**
- * Make a direct connection to namenode and manipulate structures
- * there.
- */
- public boolean delete(UTF8 src) throws IOException {
- checkOpen();
- return namenode.delete(src.toString());
+ /**
+ * Pick the best node from which to stream the data.
+ * Entries in <i>nodes</i> are already in the priority order
+ */
+ private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
+ if (nodes != null) {
+ for (int i = 0; i < nodes.length; i++) {
+ if (!deadNodes.contains(nodes[i])) {
+ return nodes[i];
+ }
+ }
}
+ throw new IOException("No live nodes contain current block");
+ }
+ /***************************************************************
+ * Periodically check in with the namenode and renew all the leases
+ * when the lease period is half over.
+ ***************************************************************/
+ class LeaseChecker implements Runnable {
/**
*/
- public boolean exists(UTF8 src) throws IOException {
- checkOpen();
- return namenode.exists(src.toString());
+ public void run() {
+ long lastRenewed = 0;
+ while (running) {
+ if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
+ try {
+ if( pendingCreates.size() > 0 )
+ namenode.renewLease(clientName);
+ lastRenewed = System.currentTimeMillis();
+ } catch (IOException ie) {
+ String err = StringUtils.stringifyException(ie);
+ LOG.warn("Problem renewing lease for " + clientName +
+ ": " + err);
+ }
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ }
+ }
}
+ }
- /**
- */
- public boolean isDirectory(UTF8 src) throws IOException {
- checkOpen();
- return namenode.isDir(src.toString());
+ /** Utility class to encapsulate data node info and its ip address. */
+ private static class DNAddrPair {
+ DatanodeInfo info;
+ InetSocketAddress addr;
+ DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
+ this.info = info;
+ this.addr = addr;
}
-
+ }
+
+ /****************************************************************
+ * DFSInputStream provides bytes from a named file. It handles
+ * negotiation of the namenode and various datanodes as necessary.
+ ****************************************************************/
+ class DFSInputStream extends FSInputStream {
+ private Socket s = null;
+ boolean closed = false;
+
+ private String src;
+ private DataInputStream blockStream;
+ private Block blocks[] = null;
+ private DatanodeInfo nodes[][] = null;
+ private DatanodeInfo currentNode = null;
+ private Block currentBlock = null;
+ private long pos = 0;
+ private long filelen = 0;
+ private long blockEnd = -1;
+ private TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
+
/**
*/
- public DFSFileInfo[] listPaths(UTF8 src) throws IOException {
- checkOpen();
- return namenode.getListing(src.toString());
+ public DFSInputStream(String src) throws IOException {
+ this.src = src;
+ openInfo();
+ this.blockStream = null;
+ for (int i = 0; i < blocks.length; i++) {
+ this.filelen += blocks[i].getNumBytes();
+ }
}
/**
+ * Grab the open-file info from namenode
*/
- public long totalRawCapacity() throws IOException {
- long rawNums[] = namenode.getStats();
- return rawNums[0];
- }
+ synchronized void openInfo() throws IOException {
+ Block oldBlocks[] = this.blocks;
- /**
- */
- public long totalRawUsed() throws IOException {
- long rawNums[] = namenode.getStats();
- return rawNums[1];
- }
+ LocatedBlock results[] = namenode.open(src);
+ Vector blockV = new Vector();
+ Vector nodeV = new Vector();
+ for (int i = 0; i < results.length; i++) {
+ blockV.add(results[i].getBlock());
+ nodeV.add(results[i].getLocations());
+ }
+ Block newBlocks[] = (Block[]) blockV.toArray(new Block[blockV.size()]);
- public DatanodeInfo[] datanodeReport() throws IOException {
- return namenode.getDatanodeReport();
- }
-
- /**
- * Enter, leave or get safe mode.
- * See {@link ClientProtocol#setSafeMode(FSConstants.SafeModeAction)}
- * for more details.
- *
- * @see ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
- */
- public boolean setSafeMode( SafeModeAction action ) throws IOException {
- return namenode.setSafeMode( action );
+ if (oldBlocks != null) {
+ for (int i = 0; i < oldBlocks.length; i++) {
+ if (! oldBlocks[i].equals(newBlocks[i])) {
+ throw new IOException("Blocklist for " + src + " has changed!");
+ }
+ }
+ if (oldBlocks.length != newBlocks.length) {
+ throw new IOException("Blocklist for " + src + " now has different length");
+ }
+ }
+ this.blocks = newBlocks;
+ this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]);
+ this.currentNode = null;
}
/**
- * Refresh the hosts and exclude files. (Rereads them.)
- * See {@link ClientProtocol#refreshNodes()}
- * for more details.
- *
- * @see ClientProtocol#refreshNodes()
+ * Returns the datanode from which the stream is currently reading.
*/
- public void refreshNodes() throws IOException {
- namenode.refreshNodes();
+ public DatanodeInfo getCurrentDatanode() {
+ return currentNode;
}
/**
- * Dumps DFS data structures into specified file.
- * See {@link ClientProtocol#metaSave()}
- * for more details.
- *
- * @see ClientProtocol#metaSave()
- */
- public void metaSave(String pathname) throws IOException {
- namenode.metaSave(pathname);
- }
-
- /**
- * @see ClientProtocol#finalizeUpgrade()
+ * Returns the block containing the target position.
*/
- public void finalizeUpgrade() throws IOException {
- namenode.finalizeUpgrade();
+ public Block getCurrentBlock() {
+ return currentBlock;
}
- /**
- */
- public boolean mkdirs(UTF8 src) throws IOException {
- checkOpen();
- return namenode.mkdirs(src.toString());
- }
/**
+ * Used by the automatic tests to detemine blocks locations of a
+ * file
*/
- public void lock(UTF8 src, boolean exclusive) throws IOException {
- long start = System.currentTimeMillis();
- boolean hasLock = false;
- while (! hasLock) {
- hasLock = namenode.obtainLock(src.toString(), clientName, exclusive);
- if (! hasLock) {
- try {
- Thread.sleep(400);
- if (System.currentTimeMillis() - start > 5000) {
- LOG.info("Waiting to retry lock for " + (System.currentTimeMillis() - start) + " ms.");
- Thread.sleep(2000);
- }
- } catch (InterruptedException ie) {
- }
- }
- }
+ synchronized DatanodeInfo[][] getDataNodes() {
+ return nodes;
}
/**
- *
+ * Open a DataInputStream to a DataNode so that it can be read from.
+ * We get block ID and the IDs of the destinations at startup, from the namenode.
*/
- public void release(UTF8 src) throws IOException {
- boolean hasReleased = false;
- while (! hasReleased) {
- hasReleased = namenode.releaseLock(src.toString(), clientName);
- if (! hasReleased) {
- LOG.info("Could not release. Retrying...");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ie) {
- }
- }
- }
- }
+ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
+ if (target >= filelen) {
+ throw new IOException("Attempted to read past end of file");
+ }
- /**
- * Pick the best node from which to stream the data.
- * Entries in <i>nodes</i> are already in the priority order
- */
- private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
- if (nodes != null) {
- for (int i = 0; i < nodes.length; i++) {
- if (!deadNodes.contains(nodes[i])) {
- return nodes[i];
- }
- }
+ if (s != null) {
+ s.close();
+ s = null;
}
- throw new IOException("No live nodes contain current block");
- }
- /***************************************************************
- * Periodically check in with the namenode and renew all the leases
- * when the lease period is half over.
- ***************************************************************/
- class LeaseChecker implements Runnable {
- /**
- */
- public void run() {
- long lastRenewed = 0;
- while (running) {
- if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
- try {
- if( pendingCreates.size() > 0 )
- namenode.renewLease(clientName);
- lastRenewed = System.currentTimeMillis();
- } catch (IOException ie) {
- String err = StringUtils.stringifyException(ie);
- LOG.warn("Problem renewing lease for " + clientName +
- ": " + err);
- }
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- }
- }
+ //
+ // Compute desired block
+ //
+ int targetBlock = -1;
+ long targetBlockStart = 0;
+ long targetBlockEnd = 0;
+ for (int i = 0; i < blocks.length; i++) {
+ long blocklen = blocks[i].getNumBytes();
+ targetBlockEnd = targetBlockStart + blocklen - 1;
+
+ if (target >= targetBlockStart && target <= targetBlockEnd) {
+ targetBlock = i;
+ break;
+ } else {
+ targetBlockStart = targetBlockEnd + 1;
}
- }
-
- /** Utility class to encapsulate data node info and its ip address. */
- private static class DNAddrPair {
- DatanodeInfo info;
- InetSocketAddress addr;
- DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
- this.info = info;
- this.addr = addr;
}
- }
-
- /****************************************************************
- * DFSInputStream provides bytes from a named file. It handles
- * negotiation of the namenode and various datanodes as necessary.
- ****************************************************************/
- class DFSInputStream extends FSInputStream {
- private Socket s = null;
- boolean closed = false;
-
- private String src;
- private DataInputStream blockStream;
- private Block blocks[] = null;
- private DatanodeInfo nodes[][] = null;
- private DatanodeInfo currentNode = null;
- private Block currentBlock = null;
- private long pos = 0;
- private long filelen = 0;
- private long blockEnd = -1;
- private TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
-
- /**
- */
- public DFSInputStream(String src) throws IOException {
- this.src = src;
- openInfo();
- this.blockStream = null;
- for (int i = 0; i < blocks.length; i++) {
- this.filelen += blocks[i].getNumBytes();
- }
- }
-
- /**
- * Grab the open-file info from namenode
- */
- synchronized void openInfo() throws IOException {
- Block oldBlocks[] = this.blocks;
-
- LocatedBlock results[] = namenode.open(src);
- Vector blockV = new Vector();
- Vector nodeV = new Vector();
- for (int i = 0; i < results.length; i++) {
- blockV.add(results[i].getBlock());
- nodeV.add(results[i].getLocations());
- }
- Block newBlocks[] = (Block[]) blockV.toArray(new Block[blockV.size()]);
+ if (targetBlock < 0) {
+ throw new IOException("Impossible situation: could not find target position " + target);
+ }
+ long offsetIntoBlock = target - targetBlockStart;
- if (oldBlocks != null) {
- for (int i = 0; i < oldBlocks.length; i++) {
- if (! oldBlocks[i].equals(newBlocks[i])) {
- throw new IOException("Blocklist for " + src + " has changed!");
- }
- }
- if (oldBlocks.length != newBlocks.length) {
- throw new IOException("Blocklist for " + src + " now has different length");
- }
- }
- this.blocks = newBlocks;
- this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]);
- this.currentNode = null;
- }
+ //
+ // Connect to best DataNode for desired Block, with potential offset
+ //
+ DatanodeInfo chosenNode = null;
+ while (s == null) {
+ DNAddrPair retval = chooseDataNode(targetBlock);
+ chosenNode = retval.info;
+ InetSocketAddress targetAddr = retval.addr;
- /**
- * Returns the datanode from which the stream is currently reading.
- */
- public DatanodeInfo getCurrentDatanode() {
- return currentNode;
- }
+ try {
+ s = new Socket();
+ s.connect(targetAddr, READ_TIMEOUT);
+ s.setSoTimeout(READ_TIMEOUT);
- /**
- * Returns the block containing the target position.
- */
- public Block getCurrentBlock() {
- return currentBlock;
- }
+ //
+ // Xmit header info to datanode
+ //
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
+ out.write(OP_READSKIP_BLOCK);
+ blocks[targetBlock].write(out);
+ out.writeLong(offsetIntoBlock);
+ out.flush();
+ //
+ // Get bytes in block, set streams
+ //
+ DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
+ long curBlockSize = in.readLong();
+ long amtSkipped = in.readLong();
+ if (curBlockSize != blocks[targetBlock].len) {
+ throw new IOException("Recorded block size is " + blocks[targetBlock].len + ", but datanode reports size of " + curBlockSize);
+ }
+ if (amtSkipped != offsetIntoBlock) {
+ throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);
+ }
- /**
- * Used by the automatic tests to detemine blocks locations of a
- * file
- */
- synchronized DatanodeInfo[][] getDataNodes() {
- return nodes;
+ this.pos = target;
+ this.blockEnd = targetBlockEnd;
+ this.currentBlock = blocks[targetBlock];
+ this.blockStream = in;
+ return chosenNode;
+ } catch (IOException ex) {
+ // Put chosen node into dead list, continue
+ LOG.debug("Failed to connect to " + targetAddr + ":"
+ + StringUtils.stringifyException(ex));
+ deadNodes.add(chosenNode);
+ if (s != null) {
+ try {
+ s.close();
+ } catch (IOException iex) {
+ }
+ }
+ s = null;
}
+ }
+ return chosenNode;
+ }
- /**
- * Open a DataInputStream to a DataNode so that it can be read from.
- * We get block ID and the IDs of the destinations at startup, from the namenode.
- */
- private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
- if (target >= filelen) {
- throw new IOException("Attempted to read past end of file");
- }
-
- if (s != null) {
- s.close();
- s = null;
- }
-
- //
- // Compute desired block
- //
- int targetBlock = -1;
- long targetBlockStart = 0;
- long targetBlockEnd = 0;
- for (int i = 0; i < blocks.length; i++) {
- long blocklen = blocks[i].getNumBytes();
- targetBlockEnd = targetBlockStart + blocklen - 1;
-
- if (target >= targetBlockStart && target <= targetBlockEnd) {
- targetBlock = i;
- break;
- } else {
- targetBlockStart = targetBlockEnd + 1;
- }
- }
- if (targetBlock < 0) {
- throw new IOException("Impossible situation: could not find target position " + target);
- }
- long offsetIntoBlock = target - targetBlockStart;
-
- //
- // Connect to best DataNode for desired Block, with potential offset
- //
- DatanodeInfo chosenNode = null;
- while (s == null) {
- DNAddrPair retval = chooseDataNode(targetBlock);
- chosenNode = retval.info;
- InetSocketAddress targetAddr = retval.addr;
-
- try {
- s = new Socket();
- s.connect(targetAddr, READ_TIMEOUT);
- s.setSoTimeout(READ_TIMEOUT);
-
- //
- // Xmit header info to datanode
- //
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
- out.write(OP_READSKIP_BLOCK);
- blocks[targetBlock].write(out);
- out.writeLong(offsetIntoBlock);
- out.flush();
-
- //
- // Get bytes in block, set streams
- //
- DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
- long curBlockSize = in.readLong();
- long amtSkipped = in.readLong();
- if (curBlockSize != blocks[targetBlock].len) {
- throw new IOException("Recorded block size is " + blocks[targetBlock].len + ", but datanode reports size of " + curBlockSize);
- }
- if (amtSkipped != offsetIntoBlock) {
- throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);
- }
-
- this.pos = target;
- this.blockEnd = targetBlockEnd;
- this.currentBlock = blocks[targetBlock];
- this.blockStream = in;
- return chosenNode;
- } catch (IOException ex) {
- // Put chosen node into dead list, continue
- LOG.debug("Failed to connect to " + targetAddr + ":"
- + StringUtils.stringifyException(ex));
- deadNodes.add(chosenNode);
- if (s != null) {
- try {
- s.close();
- } catch (IOException iex) {
- }
- }
- s = null;
- }
- }
- return chosenNode;
- }
+ /**
+ * Close it down!
+ */
+ public synchronized void close() throws IOException {
+ checkOpen();
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
- /**
- * Close it down!
- */
- public synchronized void close() throws IOException {
- checkOpen();
- if (closed) {
- throw new IOException("Stream closed");
- }
+ if (s != null) {
+ blockStream.close();
+ s.close();
+ s = null;
+ }
+ super.close();
+ closed = true;
+ }
- if (s != null) {
- blockStream.close();
- s.close();
- s = null;
- }
- super.close();
- closed = true;
+ /**
+ * Basic read()
+ */
+ public synchronized int read() throws IOException {
+ checkOpen();
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ int result = -1;
+ if (pos < filelen) {
+ if (pos > blockEnd) {
+ currentNode = blockSeekTo(pos);
+ }
+ result = blockStream.read();
+ if (result >= 0) {
+ pos++;
}
+ }
+ return result;
+ }
- /**
- * Basic read()
- */
- public synchronized int read() throws IOException {
- checkOpen();
- if (closed) {
- throw new IOException("Stream closed");
+ /**
+ * Read the entire buffer.
+ */
+ public synchronized int read(byte buf[], int off, int len) throws IOException {
+ checkOpen();
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ if (pos < filelen) {
+ int retries = 2;
+ while (retries > 0) {
+ try {
+ if (pos > blockEnd) {
+ currentNode = blockSeekTo(pos);
}
- int result = -1;
- if (pos < filelen) {
- if (pos > blockEnd) {
- currentNode = blockSeekTo(pos);
- }
- result = blockStream.read();
- if (result >= 0) {
- pos++;
- }
+ int realLen = Math.min(len, (int) (blockEnd - pos + 1));
+ int result = blockStream.read(buf, off, realLen);
+ if (result >= 0) {
+ pos += result;
}
return result;
- }
-
- /**
- * Read the entire buffer.
- */
- public synchronized int read(byte buf[], int off, int len) throws IOException {
- checkOpen();
- if (closed) {
- throw new IOException("Stream closed");
- }
- if (pos < filelen) {
- int retries = 2;
- while (retries > 0) {
- try {
- if (pos > blockEnd) {
- currentNode = blockSeekTo(pos);
- }
- int realLen = Math.min(len, (int) (blockEnd - pos + 1));
- int result = blockStream.read(buf, off, realLen);
- if (result >= 0) {
- pos += result;
- }
- return result;
- } catch (IOException e) {
- if (retries == 1) {
- LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
- }
- blockEnd = -1;
- if (currentNode != null) { deadNodes.add(currentNode); }
- if (--retries == 0) {
- throw e;
- }
- }
- }
+ } catch (IOException e) {
+ if (retries == 1) {
+ LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
+ }
+ blockEnd = -1;
+ if (currentNode != null) { deadNodes.add(currentNode); }
+ if (--retries == 0) {
+ throw e;
}
- return -1;
+ }
}
+ }
+ return -1;
+ }
- private DNAddrPair chooseDataNode(int blockId)
- throws IOException {
- int failures = 0;
- while (true) {
- try {
- DatanodeInfo chosenNode = bestNode(nodes[blockId], deadNodes);
- InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getName());
- return new DNAddrPair(chosenNode, targetAddr);
- } catch (IOException ie) {
- String blockInfo =
- blocks[blockId]+" file="+src;
- if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
- throw new IOException("Could not obtain block: " + blockInfo);
- }
- if (nodes[blockId] == null || nodes[blockId].length == 0) {
- LOG.info("No node available for block: " + blockInfo);
- }
- LOG.info("Could not obtain block " + blockId + " from any node: " + ie);
- try {
- Thread.sleep(3000);
- } catch (InterruptedException iex) {
- }
- deadNodes.clear(); //2nd option is to remove only nodes[blockId]
- openInfo();
- failures++;
- continue;
- }
+ private DNAddrPair chooseDataNode(int blockId)
+ throws IOException {
+ int failures = 0;
+ while (true) {
+ try {
+ DatanodeInfo chosenNode = bestNode(nodes[blockId], deadNodes);
+ InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getName());
+ return new DNAddrPair(chosenNode, targetAddr);
+ } catch (IOException ie) {
+ String blockInfo =
+ blocks[blockId]+" file="+src;
+ if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
+ throw new IOException("Could not obtain block: " + blockInfo);
+ }
+ if (nodes[blockId] == null || nodes[blockId].length == 0) {
+ LOG.info("No node available for block: " + blockInfo);
+ }
+ LOG.info("Could not obtain block " + blockId + " from any node: " + ie);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException iex) {
}
- }
+ deadNodes.clear(); //2nd option is to remove only nodes[blockId]
+ openInfo();
+ failures++;
+ continue;
+ }
+ }
+ }
- private void fetchBlockByteRange(int blockId, long start,
- long end, byte[] buf, int offset) throws IOException {
- //
- // Connect to best DataNode for desired Block, with potential offset
- //
- Socket dn = null;
- while (dn == null) {
- DNAddrPair retval = chooseDataNode(blockId);
- DatanodeInfo chosenNode = retval.info;
- InetSocketAddress targetAddr = retval.addr;
+ private void fetchBlockByteRange(int blockId, long start,
+ long end, byte[] buf, int offset) throws IOException {
+ //
+ // Connect to best DataNode for desired Block, with potential offset
+ //
+ Socket dn = null;
+ while (dn == null) {
+ DNAddrPair retval = chooseDataNode(blockId);
+ DatanodeInfo chosenNode = retval.info;
+ InetSocketAddress targetAddr = retval.addr;
- try {
- dn = new Socket();
- dn.connect(targetAddr, READ_TIMEOUT);
- dn.setSoTimeout(READ_TIMEOUT);
+ try {
+ dn = new Socket();
+ dn.connect(targetAddr, READ_TIMEOUT);
+ dn.setSoTimeout(READ_TIMEOUT);
- //
- // Xmit header info to datanode
- //
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(dn.getOutputStream()));
- out.write(OP_READ_RANGE_BLOCK);
- blocks[blockId].write(out);
- out.writeLong(start);
- out.writeLong(end);
- out.flush();
+ //
+ // Xmit header info to datanode
+ //
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(dn.getOutputStream()));
+ out.write(OP_READ_RANGE_BLOCK);
+ blocks[blockId].write(out);
+ out.writeLong(start);
+ out.writeLong(end);
+ out.flush();
- //
- // Get bytes in block, set streams
- //
- DataInputStream in = new DataInputStream(new BufferedInputStream(dn.getInputStream()));
- long curBlockSize = in.readLong();
- long actualStart = in.readLong();
- long actualEnd = in.readLong();
- if (curBlockSize != blocks[blockId].len) {
- throw new IOException("Recorded block size is " +
- blocks[blockId].len + ", but datanode reports size of " +
- curBlockSize);
- }
- if ((actualStart != start) || (actualEnd != end)) {
- throw new IOException("Asked for byte range " + start +
- "-" + end + ", but only received range " + actualStart +
- "-" + actualEnd);
- }
- int nread = in.read(buf, offset, (int)(end - start + 1));
- } catch (IOException ex) {
- // Put chosen node into dead list, continue
- LOG.debug("Failed to connect to " + targetAddr + ":"
- + StringUtils.stringifyException(ex));
- deadNodes.add(chosenNode);
- if (dn != null) {
- try {
- dn.close();
- } catch (IOException iex) {
- }
- }
- dn = null;
+ //
+ // Get bytes in block, set streams
+ //
+ DataInputStream in = new DataInputStream(new BufferedInputStream(dn.getInputStream()));
+ long curBlockSize = in.readLong();
+ long actualStart = in.readLong();
+ long actualEnd = in.readLong();
+ if (curBlockSize != blocks[blockId].len) {
+ throw new IOException("Recorded block size is " +
+ blocks[blockId].len + ", but datanode reports size of " +
+ curBlockSize);
+ }
+ if ((actualStart != start) || (actualEnd != end)) {
+ throw new IOException("Asked for byte range " + start +
+ "-" + end + ", but only received range " + actualStart +
+ "-" + actualEnd);
+ }
+ int nread = in.read(buf, offset, (int)(end - start + 1));
+ } catch (IOException ex) {
+ // Put chosen node into dead list, continue
+ LOG.debug("Failed to connect to " + targetAddr + ":"
+ + StringUtils.stringifyException(ex));
+ deadNodes.add(chosenNode);
+ if (dn != null) {
+ try {
+ dn.close();
+ } catch (IOException iex) {
}
}
+ dn = null;
}
+ }
+ }
- public int read(long position, byte[] buf, int off, int len)
- throws IOException {
- // sanity checks
- checkOpen();
- if (closed) {
- throw new IOException("Stream closed");
- }
- if ((position < 0) || (position > filelen)) {
- return -1;
- }
- int realLen = len;
- if ((position + len) > filelen) {
- realLen = (int)(filelen - position);
- }
- // determine the block and byte range within the block
- // corresponding to position and realLen
- int targetBlock = -1;
- long targetStart = 0;
- long targetEnd = 0;
- for (int idx = 0; idx < blocks.length; idx++) {
- long blocklen = blocks[idx].getNumBytes();
- targetEnd = targetStart + blocklen - 1;
- if (position >= targetStart && position <= targetEnd) {
- targetBlock = idx;
- targetStart = position - targetStart;
- targetEnd = Math.min(blocklen, targetStart + realLen) - 1;
- realLen = (int)(targetEnd - targetStart + 1);
- break;
- }
- targetStart += blocklen;
- }
- if (targetBlock < 0) {
- throw new IOException(
- "Impossible situation: could not find target position "+
- position);
- }
- fetchBlockByteRange(targetBlock, targetStart, targetEnd, buf, off);
- return realLen;
+ public int read(long position, byte[] buf, int off, int len)
+ throws IOException {
+ // sanity checks
+ checkOpen();
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ if ((position < 0) || (position > filelen)) {
+ return -1;
+ }
+ int realLen = len;
+ if ((position + len) > filelen) {
+ realLen = (int)(filelen - position);
+ }
+ // determine the block and byte range within the block
+ // corresponding to position and realLen
+ int targetBlock = -1;
+ long targetStart = 0;
+ long targetEnd = 0;
+ for (int idx = 0; idx < blocks.length; idx++) {
+ long blocklen = blocks[idx].getNumBytes();
+ targetEnd = targetStart + blocklen - 1;
+ if (position >= targetStart && position <= targetEnd) {
+ targetBlock = idx;
+ targetStart = position - targetStart;
+ targetEnd = Math.min(blocklen, targetStart + realLen) - 1;
+ realLen = (int)(targetEnd - targetStart + 1);
+ break;
}
+ targetStart += blocklen;
+ }
+ if (targetBlock < 0) {
+ throw new IOException(
+ "Impossible situation: could not find target position "+
+ position);
+ }
+ fetchBlockByteRange(targetBlock, targetStart, targetEnd, buf, off);
+ return realLen;
+ }
- /**
- * Seek to a new arbitrary location
- */
- public synchronized void seek(long targetPos) throws IOException {
- if (targetPos > filelen) {
- throw new IOException("Cannot seek after EOF");
- }
- boolean done = false;
- if (pos <= targetPos && targetPos <= blockEnd) {
- //
- // If this seek is to a positive position in the current
- // block, and this piece of data might already be lying in
- // the TCP buffer, then just eat up the intervening data.
- //
- int diff = (int)(targetPos - pos);
- if (diff <= TCP_WINDOW_SIZE) {
- blockStream.skipBytes(diff);
- pos += diff;
- assert(pos == targetPos);
- done = true;
- }
- }
- if (!done) {
- pos = targetPos;
- blockEnd = -1;
- }
+ /**
+ * Seek to a new arbitrary location
+ */
+ public synchronized void seek(long targetPos) throws IOException {
+ if (targetPos > filelen) {
+ throw new IOException("Cannot seek after EOF");
+ }
+ boolean done = false;
+ if (pos <= targetPos && targetPos <= blockEnd) {
+ //
+ // If this seek is to a positive position in the current
+ // block, and this piece of data might already be lying in
+ // the TCP buffer, then just eat up the intervening data.
+ //
+ int diff = (int)(targetPos - pos);
+ if (diff <= TCP_WINDOW_SIZE) {
+ blockStream.skipBytes(diff);
+ pos += diff;
+ assert(pos == targetPos);
+ done = true;
}
+ }
+ if (!done) {
+ pos = targetPos;
+ blockEnd = -1;
+ }
+ }
- /**
- * Seek to given position on a node other than the current node. If
- * a node other than the current node is found, then returns true.
- * If another node could not be found, then returns false.
- */
- public synchronized boolean seekToNewSource(long targetPos) throws IOException {
- boolean markedDead = deadNodes.contains(currentNode);
- deadNodes.add(currentNode);
- DatanodeInfo oldNode = currentNode;
- DatanodeInfo newNode = blockSeekTo(targetPos);
- if ( !markedDead ) {
- /* remove it from deadNodes. blockSeekTo could have cleared
- * deadNodes and added currentNode again. Thats ok. */
- deadNodes.remove(oldNode);
- }
- if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
- currentNode = newNode;
- return true;
- } else {
- return false;
- }
- }
+ /**
+ * Seek to given position on a node other than the current node. If
+ * a node other than the current node is found, then returns true.
+ * If another node could not be found, then returns false.
+ */
+ public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+ boolean markedDead = deadNodes.contains(currentNode);
+ deadNodes.add(currentNode);
+ DatanodeInfo oldNode = currentNode;
+ DatanodeInfo newNode = blockSeekTo(targetPos);
+ if ( !markedDead ) {
+ /* remove it from deadNodes. blockSeekTo could have cleared
+ * deadNodes and added currentNode again. Thats ok. */
+ deadNodes.remove(oldNode);
+ }
+ if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
+ currentNode = newNode;
+ return true;
+ } else {
+ return false;
+ }
+ }
- /**
- */
- public synchronized long getPos() throws IOException {
- return pos;
- }
+ /**
+ */
+ public synchronized long getPos() throws IOException {
+ return pos;
+ }
- /**
- */
- public synchronized int available() throws IOException {
- if (closed) {
- throw new IOException("Stream closed");
- }
- return (int) (filelen - pos);
- }
+ /**
+ */
+ public synchronized int available() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ return (int) (filelen - pos);
+ }
- /**
- * We definitely don't support marks
- */
- public boolean markSupported() {
- return false;
- }
- public void mark(int readLimit) {
- }
- public void reset() throws IOException {
- throw new IOException("Mark not supported");
- }
+ /**
+ * We definitely don't support marks
+ */
+ public boolean markSupported() {
+ return false;
+ }
+ public void mark(int readLimit) {
}
+ public void reset() throws IOException {
+ throw new IOException("Mark not supported");
+ }
+ }
- static class DFSDataInputStream extends FSDataInputStream {
- DFSDataInputStream(DFSInputStream in, Configuration conf)
+ static class DFSDataInputStream extends FSDataInputStream {
+ DFSDataInputStream(DFSInputStream in, Configuration conf)
throws IOException {
- super(in, conf);
- }
+ super(in, conf);
+ }
- DFSDataInputStream(DFSInputStream in, int bufferSize) throws IOException {
- super(in, bufferSize);
- }
+ DFSDataInputStream(DFSInputStream in, int bufferSize) throws IOException {
+ super(in, bufferSize);
+ }
- /**
- * Returns the datanode from which the stream is currently reading.
- */
- public DatanodeInfo getCurrentDatanode() {
- return ((DFSInputStream)inStream).getCurrentDatanode();
- }
+ /**
+ * Returns the datanode from which the stream is currently reading.
+ */
+ public DatanodeInfo getCurrentDatanode() {
+ return ((DFSInputStream)inStream).getCurrentDatanode();
+ }
- /**
- * Returns the block containing the target position.
- */
- public Block getCurrentBlock() {
- return ((DFSInputStream)inStream).getCurrentBlock();
- }
-
- /**
- * Used by the automatic tests to detemine blocks locations of a
- * file
- */
- synchronized DatanodeInfo[][] getDataNodes() {
- return ((DFSInputStream)inStream).getDataNodes();
+ /**
+ * Returns the block containing the target position.
+ */
+ public Block getCurrentBlock() {
+ return ((DFSInputStream)inStream).getCurrentBlock();
+ }
+
+ /**
+ * Used by the automatic tests to detemine blocks locations of a
+ * file
+ */
+ synchronized DatanodeInfo[][] getDataNodes() {
+ return ((DFSInputStream)inStream).getDataNodes();
+ }
+
+ }
+
+ /****************************************************************
+ * DFSOutputStream creates files from a stream of bytes.
+ ****************************************************************/
+ class DFSOutputStream extends OutputStream {
+ private Socket s;
+ boolean closed = false;
+
+ private byte outBuf[] = new byte[BUFFER_SIZE];
+ private int pos = 0;
+
+ private UTF8 src;
+ private boolean overwrite;
+ private short replication;
+ private boolean firstTime = true;
+ private DataOutputStream blockStream;
+ private DataInputStream blockReplyStream;
+ private File backupFile;
+ private OutputStream backupStream;
+ private Block block;
+ private long filePos = 0;
+ private int bytesWrittenToBlock = 0;
+ private String datanodeName;
+ private long blockSize;
+
+ private Progressable progress;
+ /**
+ * Create a new output stream to the given DataNode.
+ */
+ public DFSOutputStream(UTF8 src, boolean overwrite,
+ short replication, long blockSize,
+ Progressable progress
+ ) throws IOException {
+ this.src = src;
+ this.overwrite = overwrite;
+ this.replication = replication;
+ this.backupFile = newBackupFile();
+ this.blockSize = blockSize;
+ this.backupStream = new FileOutputStream(backupFile);
+ this.progress = progress;
+ if (progress != null) {
+ LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
}
+ }
+ /* Wrapper for closing backupStream. This sets backupStream to null so
+ * that we do not attempt to write to backupStream that could be
+ * invalid in subsequent writes. Otherwise we might end trying to write
+ * filedescriptor that we don't own.
+ */
+ private void closeBackupStream() throws IOException {
+ if ( backupStream != null ) {
+ OutputStream stream = backupStream;
+ backupStream = null;
+ stream.close();
+ }
+ }
+ /* Similar to closeBackupStream(). Theoritically deleting a file
+ * twice could result in deleting a file that we should not.
+ */
+ private void deleteBackupFile() {
+ if ( backupFile != null ) {
+ File file = backupFile;
+ backupFile = null;
+ file.delete();
+ }
+ }
+
+ private File newBackupFile() throws IOException {
+ File result = conf.getFile("dfs.client.buffer.dir",
+ "tmp"+File.separator+
+ "client-"+Math.abs(r.nextLong()));
+ result.deleteOnExit();
+ return result;
}
- /****************************************************************
- * DFSOutputStream creates files from a stream of bytes.
- ****************************************************************/
- class DFSOutputStream extends OutputStream {
- private Socket s;
- boolean closed = false;
-
- private byte outBuf[] = new byte[BUFFER_SIZE];
- private int pos = 0;
-
- private UTF8 src;
- private boolean overwrite;
- private short replication;
- private boolean firstTime = true;
- private DataOutputStream blockStream;
- private DataInputStream blockReplyStream;
- private File backupFile;
- private OutputStream backupStream;
- private Block block;
- private long filePos = 0;
- private int bytesWrittenToBlock = 0;
- private String datanodeName;
- private long blockSize;
-
- private Progressable progress;
- /**
- * Create a new output stream to the given DataNode.
- */
- public DFSOutputStream(UTF8 src, boolean overwrite,
- short replication, long blockSize,
- Progressable progress
- ) throws IOException {
- this.src = src;
- this.overwrite = overwrite;
- this.replication = replication;
- this.backupFile = newBackupFile();
- this.blockSize = blockSize;
- this.backupStream = new FileOutputStream(backupFile);
- this.progress = progress;
- if (progress != null) {
- LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
- }
+ /**
+ * Open a DataOutputStream to a DataNode so that it can be written to.
+ * This happens when a file is created and each time a new block is allocated.
+ * Must get block ID and the IDs of the destinations from the namenode.
+ */
+ private synchronized void nextBlockOutputStream() throws IOException {
+ boolean retry = false;
+ long startTime = System.currentTimeMillis();
+ do {
+ retry = false;
+
+ LocatedBlock lb;
+ if (firstTime) {
+ lb = locateNewBlock();
+ } else {
+ lb = locateFollowingBlock(startTime);
+ }
+
+ block = lb.getBlock();
+ if ( block.getNumBytes() < bytesWrittenToBlock ) {
+ block.setNumBytes( bytesWrittenToBlock );
}
+ DatanodeInfo nodes[] = lb.getLocations();
- /* Wrapper for closing backupStream. This sets backupStream to null so
- * that we do not attempt to write to backupStream that could be
- * invalid in subsequent writes. Otherwise we might end trying to write
- * filedescriptor that we don't own.
- */
- private void closeBackupStream() throws IOException {
- if ( backupStream != null ) {
- OutputStream stream = backupStream;
- backupStream = null;
- stream.close();
- }
- }
- /* Similar to closeBackupStream(). Theoritically deleting a file
- * twice could result in deleting a file that we should not.
- */
- private void deleteBackupFile() {
- if ( backupFile != null ) {
- File file = backupFile;
- backupFile = null;
- file.delete();
+ //
+ // Connect to first DataNode in the list. Abort if this fails.
+ //
+ InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName());
+ try {
+ s = new Socket();
+ s.connect(target, READ_TIMEOUT);
+ s.setSoTimeout(replication * READ_TIMEOUT);
+ datanodeName = nodes[0].getName();
+ } catch (IOException ie) {
+ // Connection failed. Let's wait a little bit and retry
+ try {
+ if (System.currentTimeMillis() - startTime > 5000) {
+ LOG.info("Waiting to find target node: " + target);
+ }
+ Thread.sleep(6000);
+ } catch (InterruptedException iex) {
+ }
+ if (firstTime) {
+ namenode.abandonFileInProgress(src.toString(),
+ clientName);
+ } else {
+ namenode.abandonBlock(block, src.toString());
}
+ retry = true;
+ continue;
}
-
- private File newBackupFile() throws IOException {
- File result = conf.getFile("dfs.client.buffer.dir",
- "tmp"+File.separator+
- "client-"+Math.abs(r.nextLong()));
- result.deleteOnExit();
- return result;
- }
-
- /**
- * Open a DataOutputStream to a DataNode so that it can be written to.
- * This happens when a file is created and each time a new block is allocated.
- * Must get block ID and the IDs of the destinations from the namenode.
- */
- private synchronized void nextBlockOutputStream() throws IOException {
- boolean retry = false;
- long startTime = System.currentTimeMillis();
- do {
- retry = false;
-
- LocatedBlock lb;
- if (firstTime) {
- lb = locateNewBlock();
- } else {
- lb = locateFollowingBlock(startTime);
- }
-
- block = lb.getBlock();
- if ( block.getNumBytes() < bytesWrittenToBlock ) {
- block.setNumBytes( bytesWrittenToBlock );
- }
- DatanodeInfo nodes[] = lb.getLocations();
-
- //
- // Connect to first DataNode in the list. Abort if this fails.
- //
- InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName());
- try {
- s = new Socket();
- s.connect(target, READ_TIMEOUT);
- s.setSoTimeout(replication * READ_TIMEOUT);
- datanodeName = nodes[0].getName();
- } catch (IOException ie) {
- // Connection failed. Let's wait a little bit and retry
- try {
- if (System.currentTimeMillis() - startTime > 5000) {
- LOG.info("Waiting to find target node: " + target);
- }
- Thread.sleep(6000);
- } catch (InterruptedException iex) {
- }
- if (firstTime) {
- namenode.abandonFileInProgress(src.toString(),
- clientName);
- } else {
- namenode.abandonBlock(block, src.toString());
- }
- retry = true;
- continue;
- }
-
- //
- // Xmit header info to datanode
- //
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
- out.write(OP_WRITE_BLOCK);
- out.writeBoolean(true);
- block.write(out);
- out.writeInt(nodes.length);
- for (int i = 0; i < nodes.length; i++) {
- nodes[i].write(out);
- }
- out.write(CHUNKED_ENCODING);
- blockStream = out;
- blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
- } while (retry);
- firstTime = false;
- }
-
- private LocatedBlock locateNewBlock() throws IOException {
- int retries = 3;
- while (true) {
- while (true) {
+
+ //
+ // Xmit header info to datanode
+ //
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
+ out.write(OP_WRITE_BLOCK);
+ out.writeBoolean(true);
+ block.write(out);
+ out.writeInt(nodes.length);
+ for (int i = 0; i < nodes.length; i++) {
+ nodes[i].write(out);
+ }
+ out.write(CHUNKED_ENCODING);
+ blockStream = out;
+ blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
+ } while (retry);
+ firstTime = false;
+ }
+
+ private LocatedBlock locateNewBlock() throws IOException {
+ int retries = 3;
+ while (true) {
+ while (true) {
+ try {
+ return namenode.create(src.toString(), clientName.toString(),
+ overwrite, replication, blockSize);
+ } catch (RemoteException e) {
+ if (--retries == 0 ||
+ !AlreadyBeingCreatedException.class.getName().
+ equals(e.getClassName())) {
+ throw e;
+ } else {
+ // because failed tasks take upto LEASE_PERIOD to
+ // release their pendingCreates files, if the file
+ // we want to create is already being created,
+ // wait and try again.
+ LOG.info(StringUtils.stringifyException(e));
try {
- return namenode.create(src.toString(), clientName.toString(),
- overwrite, replication, blockSize);
- } catch (RemoteException e) {
- if (--retries == 0 ||
- !AlreadyBeingCreatedException.class.getName().
- equals(e.getClassName())) {
- throw e;
- } else {
- // because failed tasks take upto LEASE_PERIOD to
- // release their pendingCreates files, if the file
- // we want to create is already being created,
- // wait and try again.
- LOG.info(StringUtils.stringifyException(e));
- try {
- Thread.sleep(LEASE_SOFTLIMIT_PERIOD);
- } catch (InterruptedException ie) {
- }
- }
+ Thread.sleep(LEASE_SOFTLIMIT_PERIOD);
+ } catch (InterruptedException ie) {
}
}
}
}
+ }
+ }
- private LocatedBlock locateFollowingBlock(long start
- ) throws IOException {
- int retries = 5;
- long sleeptime = 400;
- while (true) {
- long localstart = System.currentTimeMillis();
- while (true) {
+ private LocatedBlock locateFollowingBlock(long start
+ ) throws IOException {
+ int retries = 5;
+ long sleeptime = 400;
+ while (true) {
+ long localstart = System.currentTimeMillis();
+ while (true) {
+ try {
+ return namenode.addBlock(src.toString(),
+ clientName.toString());
+ } catch (RemoteException e) {
+ if (--retries == 0 ||
+ !NotReplicatedYetException.class.getName().
+ equals(e.getClassName())) {
+ throw e;
+ } else {
+ LOG.info(StringUtils.stringifyException(e));
+ if (System.currentTimeMillis() - localstart > 5000) {
+ LOG.info("Waiting for replication for " +
+ (System.currentTimeMillis() - localstart)/1000 +
+ " seconds");
+ }
try {
- return namenode.addBlock(src.toString(),
- clientName.toString());
- } catch (RemoteException e) {
- if (--retries == 0 ||
- !NotReplicatedYetException.class.getName().
- equals(e.getClassName())) {
- throw e;
- } else {
- LOG.info(StringUtils.stringifyException(e));
- if (System.currentTimeMillis() - localstart > 5000) {
- LOG.info("Waiting for replication for " +
- (System.currentTimeMillis() - localstart)/1000 +
- " seconds");
- }
- try {
- LOG.debug("NotReplicatedYetException sleeping " + src +
- " retries left " + retries);
- Thread.sleep(sleeptime);
- } catch (InterruptedException ie) {
- }
- }
+ LOG.debug("NotReplicatedYetException sleeping " + src +
+ " retries left " + retries);
+ Thread.sleep(sleeptime);
+ } catch (InterruptedException ie) {
}
- }
- }
+ }
+ }
}
+ }
+ }
- /**
- * We're referring to the file pos here
- */
- public synchronized long getPos() throws IOException {
- return filePos;
- }
+ /**
+ * We're referring to the file pos here
+ */
+ public synchronized long getPos() throws IOException {
+ return filePos;
+ }
- /**
- * Writes the specified byte to this output stream.
- */
- public synchronized void write(int b) throws IOException {
- checkOpen();
- if (closed) {
- throw new IOException("Stream closed");
- }
+ /**
+ * Writes the specified byte to this output stream.
+ */
+ public synchronized void write(int b) throws IOException {
+ checkOpen();
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
- if ((bytesWrittenToBlock + pos == blockSize) ||
- (pos >= BUFFER_SIZE)) {
- flush();
- }
- outBuf[pos++] = (byte) b;
- filePos++;
- }
+ if ((bytesWrittenToBlock + pos == blockSize) ||
+ (pos >= BUFFER_SIZE)) {
+ flush();
+ }
+ outBuf[pos++] = (byte) b;
+ filePos++;
+ }
- /**
- * Writes the specified bytes to this output stream.
- */
- public synchronized void write(byte b[], int off, int len)
- throws IOException {
- checkOpen();
- if (closed) {
- throw new IOException("Stream closed");
- }
- while (len > 0) {
- int remaining = Math.min(BUFFER_SIZE - pos,
- (int)((blockSize - bytesWrittenToBlock) - pos));
- int toWrite = Math.min(remaining, len);
- System.arraycopy(b, off, outBuf, pos, toWrite);
- pos += toWrite;
- off += toWrite;
- len -= toWrite;
- filePos += toWrite;
-
- if ((bytesWrittenToBlock + pos >= blockSize) ||
- (pos == BUFFER_SIZE)) {
- flush();
- }
- }
+ /**
+ * Writes the specified bytes to this output stream.
+ */
+ public synchronized void write(byte b[], int off, int len)
+ throws IOException {
+ checkOpen();
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ while (len > 0) {
+ int remaining = Math.min(BUFFER_SIZE - pos,
+ (int)((blockSize - bytesWrittenToBlock) - pos));
+ int toWrite = Math.min(remaining, len);
+ System.arraycopy(b, off, outBuf, pos, toWrite);
+ pos += toWrite;
+ off += toWrite;
+ len -= toWrite;
+ filePos += toWrite;
+
+ if ((bytesWrittenToBlock + pos >= blockSize) ||
+ (pos == BUFFER_SIZE)) {
+ flush();
}
+ }
+ }
- /**
- * Flush the buffer, getting a stream to a new block if necessary.
- */
- public synchronized void flush() throws IOException {
- checkOpen();
- if (closed) {
- throw new IOException("Stream closed");
- }
+ /**
+ * Flush the buffer, getting a stream to a new block if necessary.
+ */
+ public synchronized void flush() throws IOException {
+ checkOpen();
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
- if (bytesWrittenToBlock + pos >= blockSize) {
- flushData((int) blockSize - bytesWrittenToBlock);
- }
- if (bytesWrittenToBlock == blockSize) {
- endBlock();
- }
- flushData(pos);
- }
+ if (bytesWrittenToBlock + pos >= blockSize) {
+ flushData((int) blockSize - bytesWrittenToBlock);
+ }
+ if (bytesWrittenToBlock == blockSize) {
+ endBlock();
+ }
+ flushData(pos);
+ }
- /**
- * Actually flush the accumulated bytes to the remote node,
- * but no more bytes than the indicated number.
- */
- private synchronized void flushData(int maxPos) throws IOException {
- int workingPos = Math.min(pos, maxPos);
+ /**
+ * Actually flush the accumulated bytes to the remote node,
+ * but no more bytes than the indicated number.
+ */
+ private synchronized void flushData(int maxPos) throws IOException {
+ int workingPos = Math.min(pos, maxPos);
- if (workingPos > 0) {
- if ( backupStream == null ) {
- throw new IOException( "Trying to write to backupStream " +
- "but it already closed or not open");
- }
- //
- // To the local block backup, write just the bytes
- //
- backupStream.write(outBuf, 0, workingPos);
-
- //
- // Track position
- //
- bytesWrittenToBlock += workingPos;
- System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
- pos -= workingPos;
- }
- }
-
- /**
- * We're done writing to the current block.
- */
- private synchronized void endBlock() throws IOException {
- long sleeptime = 400;
- //
- // Done with local copy
- //
- closeBackupStream();
-
- //
- // Send it to datanode
- //
- boolean sentOk = false;
- int remainingAttempts =
- conf.getInt("dfs.client.block.write.retries", 3);
- while (!sentOk) {
- nextBlockOutputStream();
- InputStream in = new FileInputStream(backupFile);
- try {
- byte buf[] = new byte[BUFFER_SIZE];
- int bytesRead = in.read(buf);
- while (bytesRead > 0) {
- blockStream.writeLong((long) bytesRead);
- blockStream.write(buf, 0, bytesRead);
- if (progress != null) { progress.progress(); }
- bytesRead = in.read(buf);
- }
- internalClose();
- sentOk = true;
- } catch (IOException ie) {
- handleSocketException(ie);
- remainingAttempts -= 1;
- if (remainingAttempts == 0) {
- throw ie;
- }
- try {
- Thread.sleep(sleeptime);
- } catch (InterruptedException e) {
- }
- } finally {
- in.close();
- }
- }
+ if (workingPos > 0) {
+ if ( backupStream == null ) {
+ throw new IOException( "Trying to write to backupStream " +
+ "but it already closed or not open");
+ }
+ //
+ // To the local block backup, write just the bytes
+ //
+ backupStream.write(outBuf, 0, workingPos);
+
+ //
+ // Track position
+ //
+ bytesWrittenToBlock += workingPos;
+ System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
+ pos -= workingPos;
+ }
+ }
- bytesWrittenToBlock = 0;
- //
- // Delete local backup, start new one
- //
- deleteBackupFile();
- File tmpFile = newBackupFile();
- bytesWrittenToBlock = 0;
- backupStream = new FileOutputStream(tmpFile);
- backupFile = tmpFile;
+ /**
+ * We're done writing to the current block.
+ */
+ private synchronized void endBlock() throws IOException {
+ long sleeptime = 400;
+ //
+ // Done with local copy
+ //
+ closeBackupStream();
+
+ //
+ // Send it to datanode
+ //
+ boolean sentOk = false;
+ int remainingAttempts =
+ conf.getInt("dfs.client.block.write.retries", 3);
+ while (!sentOk) {
+ nextBlockOutputStream();
+ InputStream in = new FileInputStream(backupFile);
+ try {
+ byte buf[] = new byte[BUFFER_SIZE];
+ int bytesRead = in.read(buf);
+ while (bytesRead > 0) {
+ blockStream.writeLong((long) bytesRead);
+ blockStream.write(buf, 0, bytesRead);
+ if (progress != null) { progress.progress(); }
+ bytesRead = in.read(buf);
+ }
+ internalClose();
+ sentOk = true;
+ } catch (IOException ie) {
+ handleSocketException(ie);
+ remainingAttempts -= 1;
+ if (remainingAttempts == 0) {
+ throw ie;
+ }
+ try {
+ Thread.sleep(sleeptime);
+ } catch (InterruptedException e) {
+ }
+ } finally {
+ in.close();
}
+ }
- /**
- * Close down stream to remote datanode.
- */
- private synchronized void internalClose() throws IOException {
- try {
- blockStream.writeLong(0);
- blockStream.flush();
+ bytesWrittenToBlock = 0;
+ //
+ // Delete local backup, start new one
+ //
+ deleteBackupFile();
+ File tmpFile = newBackupFile();
+ bytesWrittenToBlock = 0;
+ backupStream = new FileOutputStream(tmpFile);
+ backupFile = tmpFile;
+ }
- long complete = blockReplyStream.readLong();
- if (complete != WRITE_COMPLETE) {
- LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
- throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
- }
- } catch (IOException ie) {
- throw (IOException)
- new IOException("failure closing block of file " +
- src.toString() + " to node " +
- (datanodeName == null ? "?" : datanodeName)
- ).initCause(ie);
- }
+ /**
+ * Close down stream to remote datanode.
+ */
+ private synchronized void internalClose() throws IOException {
+ try {
+ blockStream.writeLong(0);
+ blockStream.flush();
+
+ long complete = blockReplyStream.readLong();
+ if (complete != WRITE_COMPLETE) {
+ LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
+ throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
+ }
+ } catch (IOException ie) {
+ throw (IOException)
+ new IOException("failure closing block of file " +
+ src.toString() + " to node " +
+ (datanodeName == null ? "?" : datanodeName)
+ ).initCause(ie);
+ }
- LocatedBlock lb = new LocatedBlock();
- lb.readFields(blockReplyStream);
+ LocatedBlock lb = new LocatedBlock();
[... 130 lines stripped ...]