You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jo...@apache.org on 2008/08/14 19:58:31 UTC
svn commit: r685979 [3/3] - in /hadoop/core/trunk: ./
src/hdfs/org/apache/hadoop/hdfs/server/datanode/
src/test/org/apache/hadoop/hdfs/
src/test/org/apache/hadoop/hdfs/server/datanode/
Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=685979&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Aug 14 10:58:30 2008
@@ -0,0 +1,571 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Thread for processing incoming/outgoing data stream.
+ */
+class DataXceiver implements Runnable, FSConstants {
+ public static final Log LOG = DataNode.LOG;
+
+ Socket s;
+ String remoteAddress; // address of remote side
+ String localAddress; // local address of this daemon
+ DataNode datanode;
+ DataXceiverServer dataXceiverServer;
+
+ public DataXceiver(Socket s, DataNode datanode,
+ DataXceiverServer dataXceiverServer) {
+
+ this.s = s;
+ this.datanode = datanode;
+ this.dataXceiverServer = dataXceiverServer;
+ InetSocketAddress isock = (InetSocketAddress)s.getRemoteSocketAddress();
+ remoteAddress = isock.toString();
+ localAddress = s.getInetAddress() + ":" + s.getLocalPort();
+ LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
+ }
+
+ /**
+ * Read/write data from/to the DataXceiveServer.
+ */
+ public void run() {
+ DataInputStream in=null;
+ try {
+ in = new DataInputStream(
+ new BufferedInputStream(NetUtils.getInputStream(s),
+ SMALL_BUFFER_SIZE));
+ short version = in.readShort();
+ if ( version != DATA_TRANSFER_VERSION ) {
+ throw new IOException( "Version Mismatch" );
+ }
+ boolean local = s.getInetAddress().equals(s.getLocalAddress());
+ byte op = in.readByte();
+ // Make sure the xciver count is not exceeded
+ int curXceiverCount = datanode.getXceiverCount();
+ if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
+ throw new IOException("xceiverCount " + curXceiverCount
+ + " exceeds the limit of concurrent xcievers "
+ + dataXceiverServer.maxXceiverCount);
+ }
+ long startTime = DataNode.now();
+ switch ( op ) {
+ case OP_READ_BLOCK:
+ readBlock( in );
+ datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
+ if (local)
+ datanode.myMetrics.readsFromLocalClient.inc();
+ else
+ datanode.myMetrics.readsFromRemoteClient.inc();
+ break;
+ case OP_WRITE_BLOCK:
+ writeBlock( in );
+ datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
+ if (local)
+ datanode.myMetrics.writesFromLocalClient.inc();
+ else
+ datanode.myMetrics.writesFromRemoteClient.inc();
+ break;
+ case OP_READ_METADATA:
+ readMetadata( in );
+ datanode.myMetrics.readMetadataOp.inc(DataNode.now() - startTime);
+ break;
+ case OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
+ replaceBlock(in);
+ datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
+ break;
+ case OP_COPY_BLOCK: // for balancing purpose; send to a proxy source
+ copyBlock(in);
+ datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
+ break;
+ default:
+ throw new IOException("Unknown opcode " + op + " in data stream");
+ }
+ } catch (Throwable t) {
+ LOG.error(datanode.dnRegistration + ":DataXceiver",t);
+ } finally {
+ LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
+ + datanode.getXceiverCount());
+ IOUtils.closeStream(in);
+ IOUtils.closeSocket(s);
+ dataXceiverServer.childSockets.remove(s);
+ }
+ }
+
+ /**
+ * Read a block from the disk.
+ * @param in The stream to read from
+ * @throws IOException
+ */
+ private void readBlock(DataInputStream in) throws IOException {
+ //
+ // Read in the header
+ //
+ long blockId = in.readLong();
+ Block block = new Block( blockId, 0 , in.readLong());
+
+ long startOffset = in.readLong();
+ long length = in.readLong();
+
+ // send the block
+ OutputStream baseStream = NetUtils.getOutputStream(s,
+ datanode.socketWriteTimeout);
+ DataOutputStream out = new DataOutputStream(
+ new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
+
+ BlockSender blockSender = null;
+ try {
+ try {
+ blockSender = new BlockSender(block, startOffset, length,
+ true, true, false, datanode);
+ } catch(IOException e) {
+ out.writeShort(OP_STATUS_ERROR);
+ throw e;
+ }
+
+ out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
+ long read = blockSender.sendBlock(out, baseStream, null); // send data
+
+ if (blockSender.isBlockReadFully()) {
+ // See if client verification succeeded.
+ // This is an optional response from client.
+ try {
+ if (in.readShort() == OP_STATUS_CHECKSUM_OK &&
+ datanode.blockScanner != null) {
+ datanode.blockScanner.verifiedByClient(block);
+ }
+ } catch (IOException ignored) {}
+ }
+
+ datanode.myMetrics.bytesRead.inc((int) read);
+ datanode.myMetrics.blocksRead.inc();
+ LOG.info(datanode.dnRegistration + " Served block " + block + " to " +
+ s.getInetAddress());
+ } catch ( SocketException ignored ) {
+ // Its ok for remote side to close the connection anytime.
+ datanode.myMetrics.blocksRead.inc();
+ } catch ( IOException ioe ) {
+ /* What exactly should we do here?
+ * Earlier version shutdown() datanode if there is disk error.
+ */
+ LOG.warn(datanode.dnRegistration + ":Got exception while serving " +
+ block + " to " +
+ s.getInetAddress() + ":\n" +
+ StringUtils.stringifyException(ioe) );
+ throw ioe;
+ } finally {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(blockSender);
+ }
+ }
+
+ /**
+ * Write a block to disk.
+ *
+ * @param in The stream to read from
+ * @throws IOException
+ */
+ private void writeBlock(DataInputStream in) throws IOException {
+ DatanodeInfo srcDataNode = null;
+ LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
+ " tcp no delay " + s.getTcpNoDelay());
+ //
+ // Read in the header
+ //
+ Block block = new Block(in.readLong(),
+ dataXceiverServer.estimateBlockSize, in.readLong());
+ LOG.info("Receiving block " + block +
+ " src: " + remoteAddress +
+ " dest: " + localAddress);
+ int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
+ boolean isRecovery = in.readBoolean(); // is this part of recovery?
+ String client = Text.readString(in); // working on behalf of this client
+ boolean hasSrcDataNode = in.readBoolean(); // is src node info present
+ if (hasSrcDataNode) {
+ srcDataNode = new DatanodeInfo();
+ srcDataNode.readFields(in);
+ }
+ int numTargets = in.readInt();
+ if (numTargets < 0) {
+ throw new IOException("Mislabelled incoming datastream.");
+ }
+ DatanodeInfo targets[] = new DatanodeInfo[numTargets];
+ for (int i = 0; i < targets.length; i++) {
+ DatanodeInfo tmp = new DatanodeInfo();
+ tmp.readFields(in);
+ targets[i] = tmp;
+ }
+
+ DataOutputStream mirrorOut = null; // stream to next target
+ DataInputStream mirrorIn = null; // reply from next target
+ DataOutputStream replyOut = null; // stream to prev target
+ Socket mirrorSock = null; // socket to next target
+ BlockReceiver blockReceiver = null; // responsible for data handling
+ String mirrorNode = null; // the name:port of next target
+ String firstBadLink = ""; // first datanode that failed in connection setup
+ try {
+ // open a block receiver and check if the block does not exist
+ blockReceiver = new BlockReceiver(block, in,
+ s.getInetAddress().toString(), isRecovery, client, srcDataNode,
+ datanode);
+
+ // get a connection back to the previous target
+ replyOut = new DataOutputStream(
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+
+ //
+ // Open network conn to backup machine, if
+ // appropriate
+ //
+ if (targets.length > 0) {
+ InetSocketAddress mirrorTarget = null;
+ // Connect to backup machine
+ mirrorNode = targets[0].getName();
+ mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
+ mirrorSock = datanode.newSocket();
+ try {
+ int timeoutValue = numTargets * datanode.socketTimeout;
+ int writeTimeout = datanode.socketWriteTimeout +
+ (WRITE_TIMEOUT_EXTENSION * numTargets);
+ mirrorSock.connect(mirrorTarget, timeoutValue);
+ mirrorSock.setSoTimeout(timeoutValue);
+ mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
+ mirrorOut = new DataOutputStream(
+ new BufferedOutputStream(
+ NetUtils.getOutputStream(mirrorSock, writeTimeout),
+ SMALL_BUFFER_SIZE));
+ mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
+
+ // Write header: Copied from DFSClient.java!
+ mirrorOut.writeShort( DATA_TRANSFER_VERSION );
+ mirrorOut.write( OP_WRITE_BLOCK );
+ mirrorOut.writeLong( block.getBlockId() );
+ mirrorOut.writeLong( block.getGenerationStamp() );
+ mirrorOut.writeInt( pipelineSize );
+ mirrorOut.writeBoolean( isRecovery );
+ Text.writeString( mirrorOut, client );
+ mirrorOut.writeBoolean(hasSrcDataNode);
+ if (hasSrcDataNode) { // pass src node information
+ srcDataNode.write(mirrorOut);
+ }
+ mirrorOut.writeInt( targets.length - 1 );
+ for ( int i = 1; i < targets.length; i++ ) {
+ targets[i].write( mirrorOut );
+ }
+
+ blockReceiver.writeChecksumHeader(mirrorOut);
+ mirrorOut.flush();
+
+ // read connect ack (only for clients, not for replication req)
+ if (client.length() != 0) {
+ firstBadLink = Text.readString(mirrorIn);
+ if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+ LOG.info("Datanode " + targets.length +
+ " got response for connect ack " +
+ " from downstream datanode with firstbadlink as " +
+ firstBadLink);
+ }
+ }
+
+ } catch (IOException e) {
+ if (client.length() != 0) {
+ Text.writeString(replyOut, mirrorNode);
+ replyOut.flush();
+ }
+ IOUtils.closeStream(mirrorOut);
+ mirrorOut = null;
+ IOUtils.closeStream(mirrorIn);
+ mirrorIn = null;
+ IOUtils.closeSocket(mirrorSock);
+ mirrorSock = null;
+ if (client.length() > 0) {
+ throw e;
+ } else {
+ LOG.info(datanode.dnRegistration + ":Exception transfering block " +
+ block + " to mirror " + mirrorNode +
+ ". continuing without the mirror.\n" +
+ StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ // send connect ack back to source (only for clients)
+ if (client.length() != 0) {
+ if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+ LOG.info("Datanode " + targets.length +
+ " forwarding connect ack to upstream firstbadlink is " +
+ firstBadLink);
+ }
+ Text.writeString(replyOut, firstBadLink);
+ replyOut.flush();
+ }
+
+ // receive the block and mirror to the next target
+ String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+ blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
+ mirrorAddr, null, targets.length);
+
+ // if this write is for a replication request (and not
+ // from a client), then confirm block. For client-writes,
+ // the block is finalized in the PacketResponder.
+ if (client.length() == 0) {
+ datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
+ LOG.info("Received block " + block +
+ " src: " + remoteAddress +
+ " dest: " + localAddress +
+ " of size " + block.getNumBytes());
+ }
+
+ if (datanode.blockScanner != null) {
+ datanode.blockScanner.addBlock(block);
+ }
+
+ } catch (IOException ioe) {
+ LOG.info("writeBlock " + block + " received exception " + ioe);
+ throw ioe;
+ } finally {
+ // close all opened streams
+ IOUtils.closeStream(mirrorOut);
+ IOUtils.closeStream(mirrorIn);
+ IOUtils.closeStream(replyOut);
+ IOUtils.closeSocket(mirrorSock);
+ IOUtils.closeStream(blockReceiver);
+ }
+ }
+
+ /**
+ * Reads the metadata and sends the data in one 'DATA_CHUNK'.
+ * @param in
+ */
+ void readMetadata(DataInputStream in) throws IOException {
+ Block block = new Block( in.readLong(), 0 , in.readLong());
+ MetaDataInputStream checksumIn = null;
+ DataOutputStream out = null;
+
+ try {
+
+ checksumIn = datanode.data.getMetaDataInputStream(block);
+
+ long fileSize = checksumIn.getLength();
+
+ if (fileSize >= 1L<<31 || fileSize <= 0) {
+ throw new IOException("Unexpected size for checksumFile of block" +
+ block);
+ }
+
+ byte [] buf = new byte[(int)fileSize];
+ IOUtils.readFully(checksumIn, buf, 0, buf.length);
+
+ out = new DataOutputStream(
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+
+ out.writeByte(OP_STATUS_SUCCESS);
+ out.writeInt(buf.length);
+ out.write(buf);
+
+ //last DATA_CHUNK
+ out.writeInt(0);
+ } finally {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(checksumIn);
+ }
+ }
+
+ /**
+ * Read a block from the disk and then sends it to a destination.
+ *
+ * @param in The stream to read from
+ * @throws IOException
+ */
+ private void copyBlock(DataInputStream in) throws IOException {
+ // Read in the header
+ long blockId = in.readLong(); // read block id
+ Block block = new Block(blockId, 0, in.readLong());
+ String source = Text.readString(in); // read del hint
+ DatanodeInfo target = new DatanodeInfo(); // read target
+ target.readFields(in);
+
+ Socket targetSock = null;
+ short opStatus = OP_STATUS_SUCCESS;
+ BlockSender blockSender = null;
+ DataOutputStream targetOut = null;
+ try {
+ datanode.balancingSem.acquireUninterruptibly();
+
+ // check if the block exists or not
+ blockSender = new BlockSender(block, 0, -1, false, false, false,
+ datanode);
+
+ // get the output stream to the target
+ InetSocketAddress targetAddr = NetUtils.createSocketAddr(
+ target.getName());
+ targetSock = datanode.newSocket();
+ targetSock.connect(targetAddr, datanode.socketTimeout);
+ targetSock.setSoTimeout(datanode.socketTimeout);
+
+ OutputStream baseStream = NetUtils.getOutputStream(targetSock,
+ datanode.socketWriteTimeout);
+ targetOut = new DataOutputStream(
+ new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
+
+ /* send request to the target */
+ // fist write header info
+ targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
+ targetOut.writeByte(OP_REPLACE_BLOCK); // op code
+ targetOut.writeLong(block.getBlockId()); // block id
+ targetOut.writeLong(block.getGenerationStamp()); // block id
+ Text.writeString( targetOut, source); // del hint
+
+ // then send data
+ long read = blockSender.sendBlock(targetOut, baseStream,
+ datanode.balancingThrottler);
+
+ datanode.myMetrics.bytesRead.inc((int) read);
+ datanode.myMetrics.blocksRead.inc();
+
+ // check the response from target
+ receiveResponse(targetSock, 1);
+
+ LOG.info("Copied block " + block + " to " + targetAddr);
+ } catch (IOException ioe) {
+ opStatus = OP_STATUS_ERROR;
+ LOG.warn("Got exception while serving " + block + " to "
+ + target.getName() + ": " + StringUtils.stringifyException(ioe));
+ throw ioe;
+ } finally {
+ /* send response to the requester */
+ try {
+ sendResponse(s, opStatus, datanode.socketWriteTimeout);
+ } catch (IOException replyE) {
+ LOG.warn("Error writing the response back to "+
+ s.getRemoteSocketAddress() + "\n" +
+ StringUtils.stringifyException(replyE) );
+ }
+ IOUtils.closeStream(targetOut);
+ IOUtils.closeStream(blockSender);
+ datanode.balancingSem.release();
+ }
+ }
+
+ /**
+ * Receive a block and write it to disk, it then notifies the namenode to
+ * remove the copy from the source.
+ *
+ * @param in The stream to read from
+ * @throws IOException
+ */
+ private void replaceBlock(DataInputStream in) throws IOException {
+ datanode.balancingSem.acquireUninterruptibly();
+
+ /* read header */
+ Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize,
+ in.readLong()); // block id & len
+ String sourceID = Text.readString(in);
+
+ short opStatus = OP_STATUS_SUCCESS;
+ BlockReceiver blockReceiver = null;
+ try {
+ // open a block receiver and check if the block does not exist
+ blockReceiver = new BlockReceiver(
+ block, in, s.getRemoteSocketAddress().toString(), false, "", null,
+ datanode);
+
+ // receive a block
+ blockReceiver.receiveBlock(null, null, null, null,
+ datanode.balancingThrottler, -1);
+
+ // notify name node
+ datanode.notifyNamenodeReceivedBlock(block, sourceID);
+
+ LOG.info("Moved block " + block +
+ " from " + s.getRemoteSocketAddress());
+ } catch (IOException ioe) {
+ opStatus = OP_STATUS_ERROR;
+ throw ioe;
+ } finally {
+ // send response back
+ try {
+ sendResponse(s, opStatus, datanode.socketWriteTimeout);
+ } catch (IOException ioe) {
+ LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
+ }
+ IOUtils.closeStream(blockReceiver);
+ datanode.balancingSem.release();
+ }
+ }
+
+ /**
+ * Utility function for receiving a response.
+ * @param s socket to read from
+ * @param numTargets number of responses to read
+ **/
+ private void receiveResponse(Socket s, int numTargets) throws IOException {
+ // check the response
+ DataInputStream reply = new DataInputStream(new BufferedInputStream(
+ NetUtils.getInputStream(s), BUFFER_SIZE));
+ try {
+ for (int i = 0; i < numTargets; i++) {
+ short opStatus = reply.readShort();
+ if(opStatus != OP_STATUS_SUCCESS) {
+ throw new IOException("operation failed at "+
+ s.getInetAddress());
+ }
+ }
+ } finally {
+ IOUtils.closeStream(reply);
+ }
+ }
+
+ /**
+ * Utility function for sending a response.
+ * @param s socket to write to
+ * @param opStatus status message to write
+ * @param timeout send timeout
+ **/
+ private void sendResponse(Socket s, short opStatus, long timeout)
+ throws IOException {
+ DataOutputStream reply =
+ new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+ try {
+ reply.writeShort(opStatus);
+ reply.flush();
+ } finally {
+ IOUtils.closeStream(reply);
+ }
+ }
+}
Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=685979&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Thu Aug 14 10:58:30 2008
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Server used for receiving/sending a block of data.
+ * This is created to listen for requests from clients or
+ * other DataNodes. This small server does not use the
+ * Hadoop IPC mechanism.
+ */
+class DataXceiverServer implements Runnable, FSConstants {
+ public static final Log LOG = DataNode.LOG;
+
+ ServerSocket ss;
+ DataNode datanode;
+ // Record all sockets opend for data transfer
+ Map<Socket, Socket> childSockets = Collections.synchronizedMap(
+ new HashMap<Socket, Socket>());
+
+ /**
+ * Maximal number of concurrent xceivers per node.
+ * Enforcing the limit is required in order to avoid data-node
+ * running out of memory.
+ */
+ static final int MAX_XCEIVER_COUNT = 256;
+ int maxXceiverCount = MAX_XCEIVER_COUNT;
+
+ /**
+ * We need an estimate for block size to check if the disk partition has
+ * enough space. For now we set it to be the default block size set
+ * in the server side configuration, which is not ideal because the
+ * default block size should be a client-size configuration.
+ * A better solution is to include in the header the estimated block size,
+ * i.e. either the actual block size or the default block size.
+ */
+ long estimateBlockSize;
+
+
+ DataXceiverServer(ServerSocket ss, Configuration conf,
+ DataNode datanode) {
+
+ this.ss = ss;
+ this.datanode = datanode;
+
+ this.maxXceiverCount = conf.getInt("dfs.datanode.max.xcievers",
+ MAX_XCEIVER_COUNT);
+
+ this.estimateBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ }
+
+ /**
+ */
+ public void run() {
+ while (datanode.shouldRun) {
+ try {
+ Socket s = ss.accept();
+ s.setTcpNoDelay(true);
+ new Daemon(datanode.threadGroup,
+ new DataXceiver(s, datanode, this)).start();
+ } catch (IOException ie) {
+ LOG.warn(datanode.dnRegistration + ":DataXceiveServer: "
+ + StringUtils.stringifyException(ie));
+ } catch (Throwable te) {
+ LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:"
+ + StringUtils.stringifyException(te));
+ datanode.shouldRun = false;
+ }
+ }
+ try {
+ ss.close();
+ } catch (IOException ie) {
+ LOG.warn(datanode.dnRegistration + ":DataXceiveServer: "
+ + StringUtils.stringifyException(ie));
+ }
+ }
+
+ void kill() {
+ assert datanode.shouldRun == false :
+ "shoudRun should be set to false before killing";
+ try {
+ this.ss.close();
+ } catch (IOException ie) {
+ LOG.warn(datanode.dnRegistration + ":DataXceiveServer.kill(): "
+ + StringUtils.stringifyException(ie));
+ }
+
+ // close all the sockets that were accepted earlier
+ synchronized (childSockets) {
+ for (Iterator<Socket> it = childSockets.values().iterator();
+ it.hasNext();) {
+ Socket thissock = it.next();
+ try {
+ thissock.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ }
+}
Copied: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (from r685529, hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java)
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?p2=hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java&p1=hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java&r1=685529&r2=685979&rev=685979&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Thu Aug 14 10:58:30 2008
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs;
+package org.apache.hadoop.hdfs.server.datanode;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -27,24 +27,25 @@
import java.util.List;
import java.util.Random;
+import junit.framework.TestCase;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.BlockTransferThrottler;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
-
-
-import junit.framework.TestCase;
/**
* This class tests if block replacement request to data nodes work correctly.
*/
@@ -60,7 +61,7 @@
final long TOTAL_BYTES =6*bandwidthPerSec;
long bytesToSend = TOTAL_BYTES;
long start = Util.now();
- DataNode.Throttler throttler = new DataNode.Throttler(bandwidthPerSec);
+ BlockTransferThrottler throttler = new BlockTransferThrottler(bandwidthPerSec);
long totalBytes = 0L;
long bytesSent = 1024*512L; // 0.5MB
throttler.throttle(bytesSent);
Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL
Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
------------------------------------------------------------------------------
svn:mergeinfo =