You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jo...@apache.org on 2008/08/14 19:58:31 UTC

svn commit: r685979 [3/3] - in /hadoop/core/trunk: ./ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/server/datanode/

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=685979&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Aug 14 10:58:30 2008
@@ -0,0 +1,571 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Thread for processing incoming/outgoing data stream.
+ */
+class DataXceiver implements Runnable, FSConstants {
+  public static final Log LOG = DataNode.LOG;
+  
+  Socket s;
+  String remoteAddress; // address of remote side
+  String localAddress;  // local address of this daemon
+  DataNode datanode;
+  DataXceiverServer dataXceiverServer;
+  
+  public DataXceiver(Socket s, DataNode datanode, 
+      DataXceiverServer dataXceiverServer) {
+    
+    this.s = s;
+    this.datanode = datanode;
+    this.dataXceiverServer = dataXceiverServer;
+    InetSocketAddress isock = (InetSocketAddress)s.getRemoteSocketAddress();
+    remoteAddress = isock.toString();
+    localAddress = s.getInetAddress() + ":" + s.getLocalPort();
+    LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
+  }
+
+  /**
+   * Read/write data from/to the DataXceiveServer.
+   */
+  public void run() {
+    DataInputStream in=null; 
+    try {
+      in = new DataInputStream(
+          new BufferedInputStream(NetUtils.getInputStream(s), 
+                                  SMALL_BUFFER_SIZE));
+      short version = in.readShort();
+      if ( version != DATA_TRANSFER_VERSION ) {
+        throw new IOException( "Version Mismatch" );
+      }
+      boolean local = s.getInetAddress().equals(s.getLocalAddress());
+      byte op = in.readByte();
+      // Make sure the xciver count is not exceeded
+      int curXceiverCount = datanode.getXceiverCount();
+      if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
+        throw new IOException("xceiverCount " + curXceiverCount
+                              + " exceeds the limit of concurrent xcievers "
+                              + dataXceiverServer.maxXceiverCount);
+      }
+      long startTime = DataNode.now();
+      switch ( op ) {
+      case OP_READ_BLOCK:
+        readBlock( in );
+        datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
+        if (local)
+          datanode.myMetrics.readsFromLocalClient.inc();
+        else
+          datanode.myMetrics.readsFromRemoteClient.inc();
+        break;
+      case OP_WRITE_BLOCK:
+        writeBlock( in );
+        datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
+        if (local)
+          datanode.myMetrics.writesFromLocalClient.inc();
+        else
+          datanode.myMetrics.writesFromRemoteClient.inc();
+        break;
+      case OP_READ_METADATA:
+        readMetadata( in );
+        datanode.myMetrics.readMetadataOp.inc(DataNode.now() - startTime);
+        break;
+      case OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
+        replaceBlock(in);
+        datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
+        break;
+      case OP_COPY_BLOCK: // for balancing purpose; send to a proxy source
+        copyBlock(in);
+        datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
+        break;
+      default:
+        throw new IOException("Unknown opcode " + op + " in data stream");
+      }
+    } catch (Throwable t) {
+      LOG.error(datanode.dnRegistration + ":DataXceiver",t);
+    } finally {
+      LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
+                               + datanode.getXceiverCount());
+      IOUtils.closeStream(in);
+      IOUtils.closeSocket(s);
+      dataXceiverServer.childSockets.remove(s);
+    }
+  }
+
+  /**
+   * Read a block from the disk.
+   * @param in The stream to read from
+   * @throws IOException
+   */
+  private void readBlock(DataInputStream in) throws IOException {
+    //
+    // Read in the header
+    //
+    long blockId = in.readLong();          
+    Block block = new Block( blockId, 0 , in.readLong());
+
+    long startOffset = in.readLong();
+    long length = in.readLong();
+
+    // send the block
+    OutputStream baseStream = NetUtils.getOutputStream(s, 
+        datanode.socketWriteTimeout);
+    DataOutputStream out = new DataOutputStream(
+                 new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
+    
+    BlockSender blockSender = null;
+    try {
+      try {
+        blockSender = new BlockSender(block, startOffset, length, 
+                                      true, true, false, datanode);
+      } catch(IOException e) {
+        out.writeShort(OP_STATUS_ERROR);
+        throw e;
+      }
+
+      out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
+      long read = blockSender.sendBlock(out, baseStream, null); // send data
+
+      if (blockSender.isBlockReadFully()) {
+        // See if client verification succeeded. 
+        // This is an optional response from client.
+        try {
+          if (in.readShort() == OP_STATUS_CHECKSUM_OK  && 
+              datanode.blockScanner != null) {
+            datanode.blockScanner.verifiedByClient(block);
+          }
+        } catch (IOException ignored) {}
+      }
+      
+      datanode.myMetrics.bytesRead.inc((int) read);
+      datanode.myMetrics.blocksRead.inc();
+      LOG.info(datanode.dnRegistration + " Served block " + block + " to " + 
+          s.getInetAddress());
+    } catch ( SocketException ignored ) {
+      // Its ok for remote side to close the connection anytime.
+      datanode.myMetrics.blocksRead.inc();
+    } catch ( IOException ioe ) {
+      /* What exactly should we do here?
+       * Earlier version shutdown() datanode if there is disk error.
+       */
+      LOG.warn(datanode.dnRegistration +  ":Got exception while serving " + 
+          block + " to " +
+                s.getInetAddress() + ":\n" + 
+                StringUtils.stringifyException(ioe) );
+      throw ioe;
+    } finally {
+      IOUtils.closeStream(out);
+      IOUtils.closeStream(blockSender);
+    }
+  }
+
+  /**
+   * Write a block to disk.
+   * 
+   * @param in The stream to read from
+   * @throws IOException
+   */
+  private void writeBlock(DataInputStream in) throws IOException {
+    DatanodeInfo srcDataNode = null;
+    LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
+              " tcp no delay " + s.getTcpNoDelay());
+    //
+    // Read in the header
+    //
+    Block block = new Block(in.readLong(), 
+        dataXceiverServer.estimateBlockSize, in.readLong());
+    LOG.info("Receiving block " + block + 
+             " src: " + remoteAddress +
+             " dest: " + localAddress);
+    int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
+    boolean isRecovery = in.readBoolean(); // is this part of recovery?
+    String client = Text.readString(in); // working on behalf of this client
+    boolean hasSrcDataNode = in.readBoolean(); // is src node info present
+    if (hasSrcDataNode) {
+      srcDataNode = new DatanodeInfo();
+      srcDataNode.readFields(in);
+    }
+    int numTargets = in.readInt();
+    if (numTargets < 0) {
+      throw new IOException("Mislabelled incoming datastream.");
+    }
+    DatanodeInfo targets[] = new DatanodeInfo[numTargets];
+    for (int i = 0; i < targets.length; i++) {
+      DatanodeInfo tmp = new DatanodeInfo();
+      tmp.readFields(in);
+      targets[i] = tmp;
+    }
+
+    DataOutputStream mirrorOut = null;  // stream to next target
+    DataInputStream mirrorIn = null;    // reply from next target
+    DataOutputStream replyOut = null;   // stream to prev target
+    Socket mirrorSock = null;           // socket to next target
+    BlockReceiver blockReceiver = null; // responsible for data handling
+    String mirrorNode = null;           // the name:port of next target
+    String firstBadLink = "";           // first datanode that failed in connection setup
+    try {
+      // open a block receiver and check if the block does not exist
+      blockReceiver = new BlockReceiver(block, in, 
+          s.getInetAddress().toString(), isRecovery, client, srcDataNode,
+          datanode);
+
+      // get a connection back to the previous target
+      replyOut = new DataOutputStream(
+                     NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+
+      //
+      // Open network conn to backup machine, if 
+      // appropriate
+      //
+      if (targets.length > 0) {
+        InetSocketAddress mirrorTarget = null;
+        // Connect to backup machine
+        mirrorNode = targets[0].getName();
+        mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
+        mirrorSock = datanode.newSocket();
+        try {
+          int timeoutValue = numTargets * datanode.socketTimeout;
+          int writeTimeout = datanode.socketWriteTimeout + 
+                             (WRITE_TIMEOUT_EXTENSION * numTargets);
+          mirrorSock.connect(mirrorTarget, timeoutValue);
+          mirrorSock.setSoTimeout(timeoutValue);
+          mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
+          mirrorOut = new DataOutputStream(
+             new BufferedOutputStream(
+                         NetUtils.getOutputStream(mirrorSock, writeTimeout),
+                         SMALL_BUFFER_SIZE));
+          mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
+
+          // Write header: Copied from DFSClient.java!
+          mirrorOut.writeShort( DATA_TRANSFER_VERSION );
+          mirrorOut.write( OP_WRITE_BLOCK );
+          mirrorOut.writeLong( block.getBlockId() );
+          mirrorOut.writeLong( block.getGenerationStamp() );
+          mirrorOut.writeInt( pipelineSize );
+          mirrorOut.writeBoolean( isRecovery );
+          Text.writeString( mirrorOut, client );
+          mirrorOut.writeBoolean(hasSrcDataNode);
+          if (hasSrcDataNode) { // pass src node information
+            srcDataNode.write(mirrorOut);
+          }
+          mirrorOut.writeInt( targets.length - 1 );
+          for ( int i = 1; i < targets.length; i++ ) {
+            targets[i].write( mirrorOut );
+          }
+
+          blockReceiver.writeChecksumHeader(mirrorOut);
+          mirrorOut.flush();
+
+          // read connect ack (only for clients, not for replication req)
+          if (client.length() != 0) {
+            firstBadLink = Text.readString(mirrorIn);
+            if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+              LOG.info("Datanode " + targets.length +
+                       " got response for connect ack " +
+                       " from downstream datanode with firstbadlink as " +
+                       firstBadLink);
+            }
+          }
+
+        } catch (IOException e) {
+          if (client.length() != 0) {
+            Text.writeString(replyOut, mirrorNode);
+            replyOut.flush();
+          }
+          IOUtils.closeStream(mirrorOut);
+          mirrorOut = null;
+          IOUtils.closeStream(mirrorIn);
+          mirrorIn = null;
+          IOUtils.closeSocket(mirrorSock);
+          mirrorSock = null;
+          if (client.length() > 0) {
+            throw e;
+          } else {
+            LOG.info(datanode.dnRegistration + ":Exception transfering block " +
+                     block + " to mirror " + mirrorNode +
+                     ". continuing without the mirror.\n" +
+                     StringUtils.stringifyException(e));
+          }
+        }
+      }
+
+      // send connect ack back to source (only for clients)
+      if (client.length() != 0) {
+        if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+          LOG.info("Datanode " + targets.length +
+                   " forwarding connect ack to upstream firstbadlink is " +
+                   firstBadLink);
+        }
+        Text.writeString(replyOut, firstBadLink);
+        replyOut.flush();
+      }
+
+      // receive the block and mirror to the next target
+      String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+      blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
+                                 mirrorAddr, null, targets.length);
+
+      // if this write is for a replication request (and not
+      // from a client), then confirm block. For client-writes,
+      // the block is finalized in the PacketResponder.
+      if (client.length() == 0) {
+        datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
+        LOG.info("Received block " + block + 
+                 " src: " + remoteAddress +
+                 " dest: " + localAddress +
+                 " of size " + block.getNumBytes());
+      }
+
+      if (datanode.blockScanner != null) {
+        datanode.blockScanner.addBlock(block);
+      }
+      
+    } catch (IOException ioe) {
+      LOG.info("writeBlock " + block + " received exception " + ioe);
+      throw ioe;
+    } finally {
+      // close all opened streams
+      IOUtils.closeStream(mirrorOut);
+      IOUtils.closeStream(mirrorIn);
+      IOUtils.closeStream(replyOut);
+      IOUtils.closeSocket(mirrorSock);
+      IOUtils.closeStream(blockReceiver);
+    }
+  }
+
+  /**
+   * Reads the metadata and sends the data in one 'DATA_CHUNK'.
+   * @param in
+   */
+  void readMetadata(DataInputStream in) throws IOException {
+    Block block = new Block( in.readLong(), 0 , in.readLong());
+    MetaDataInputStream checksumIn = null;
+    DataOutputStream out = null;
+    
+    try {
+
+      checksumIn = datanode.data.getMetaDataInputStream(block);
+      
+      long fileSize = checksumIn.getLength();
+
+      if (fileSize >= 1L<<31 || fileSize <= 0) {
+          throw new IOException("Unexpected size for checksumFile of block" +
+                  block);
+      }
+
+      byte [] buf = new byte[(int)fileSize];
+      IOUtils.readFully(checksumIn, buf, 0, buf.length);
+      
+      out = new DataOutputStream(
+                NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+      
+      out.writeByte(OP_STATUS_SUCCESS);
+      out.writeInt(buf.length);
+      out.write(buf);
+      
+      //last DATA_CHUNK
+      out.writeInt(0);
+    } finally {
+      IOUtils.closeStream(out);
+      IOUtils.closeStream(checksumIn);
+    }
+  }
+  
+  /**
+   * Read a block from the disk and then sends it to a destination.
+   * 
+   * @param in The stream to read from
+   * @throws IOException
+   */
+  private void copyBlock(DataInputStream in) throws IOException {
+    // Read in the header
+    long blockId = in.readLong(); // read block id
+    Block block = new Block(blockId, 0, in.readLong());
+    String source = Text.readString(in); // read del hint
+    DatanodeInfo target = new DatanodeInfo(); // read target
+    target.readFields(in);
+
+    Socket targetSock = null;
+    short opStatus = OP_STATUS_SUCCESS;
+    BlockSender blockSender = null;
+    DataOutputStream targetOut = null;
+    try {
+      datanode.balancingSem.acquireUninterruptibly();
+      
+      // check if the block exists or not
+      blockSender = new BlockSender(block, 0, -1, false, false, false, 
+          datanode);
+
+      // get the output stream to the target
+      InetSocketAddress targetAddr = NetUtils.createSocketAddr(
+          target.getName());
+      targetSock = datanode.newSocket();
+      targetSock.connect(targetAddr, datanode.socketTimeout);
+      targetSock.setSoTimeout(datanode.socketTimeout);
+
+      OutputStream baseStream = NetUtils.getOutputStream(targetSock, 
+          datanode.socketWriteTimeout);
+      targetOut = new DataOutputStream(
+                     new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
+
+      /* send request to the target */
+      // fist write header info
+      targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
+      targetOut.writeByte(OP_REPLACE_BLOCK); // op code
+      targetOut.writeLong(block.getBlockId()); // block id
+      targetOut.writeLong(block.getGenerationStamp()); // block id
+      Text.writeString( targetOut, source); // del hint
+
+      // then send data
+      long read = blockSender.sendBlock(targetOut, baseStream, 
+                                        datanode.balancingThrottler);
+
+      datanode.myMetrics.bytesRead.inc((int) read);
+      datanode.myMetrics.blocksRead.inc();
+      
+      // check the response from target
+      receiveResponse(targetSock, 1);
+
+      LOG.info("Copied block " + block + " to " + targetAddr);
+    } catch (IOException ioe) {
+      opStatus = OP_STATUS_ERROR;
+      LOG.warn("Got exception while serving " + block + " to "
+          + target.getName() + ": " + StringUtils.stringifyException(ioe));
+      throw ioe;
+    } finally {
+      /* send response to the requester */
+      try {
+        sendResponse(s, opStatus, datanode.socketWriteTimeout);
+      } catch (IOException replyE) {
+        LOG.warn("Error writing the response back to "+
+            s.getRemoteSocketAddress() + "\n" +
+            StringUtils.stringifyException(replyE) );
+      }
+      IOUtils.closeStream(targetOut);
+      IOUtils.closeStream(blockSender);
+      datanode.balancingSem.release();
+    }
+  }
+
+  /**
+   * Receive a block and write it to disk, it then notifies the namenode to
+   * remove the copy from the source.
+   * 
+   * @param in The stream to read from
+   * @throws IOException
+   */
+  private void replaceBlock(DataInputStream in) throws IOException {
+    datanode.balancingSem.acquireUninterruptibly();
+
+    /* read header */
+    Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize,
+        in.readLong()); // block id & len
+    String sourceID = Text.readString(in);
+
+    short opStatus = OP_STATUS_SUCCESS;
+    BlockReceiver blockReceiver = null;
+    try {
+      // open a block receiver and check if the block does not exist
+       blockReceiver = new BlockReceiver(
+          block, in, s.getRemoteSocketAddress().toString(), false, "", null,
+          datanode);
+
+      // receive a block
+      blockReceiver.receiveBlock(null, null, null, null, 
+          datanode.balancingThrottler, -1);
+                    
+      // notify name node
+      datanode.notifyNamenodeReceivedBlock(block, sourceID);
+
+      LOG.info("Moved block " + block + 
+          " from " + s.getRemoteSocketAddress());
+    } catch (IOException ioe) {
+      opStatus = OP_STATUS_ERROR;
+      throw ioe;
+    } finally {
+      // send response back
+      try {
+        sendResponse(s, opStatus, datanode.socketWriteTimeout);
+      } catch (IOException ioe) {
+        LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
+      }
+      IOUtils.closeStream(blockReceiver);
+      datanode.balancingSem.release();
+    }
+  }
+  
+  /**
+   *  Utility function for receiving a response.
+   *  @param s socket to read from
+   *  @param numTargets number of responses to read
+   **/
+  private void receiveResponse(Socket s, int numTargets) throws IOException {
+    // check the response
+    DataInputStream reply = new DataInputStream(new BufferedInputStream(
+                                NetUtils.getInputStream(s), BUFFER_SIZE));
+    try {
+      for (int i = 0; i < numTargets; i++) {
+        short opStatus = reply.readShort();
+        if(opStatus != OP_STATUS_SUCCESS) {
+          throw new IOException("operation failed at "+
+              s.getInetAddress());
+        } 
+      }
+    } finally {
+      IOUtils.closeStream(reply);
+    }
+  }
+
+  /**
+   * Utility function for sending a response.
+   * @param s socket to write to
+   * @param opStatus status message to write
+   * @param timeout send timeout
+   **/
+  private void sendResponse(Socket s, short opStatus, long timeout) 
+                                                       throws IOException {
+    DataOutputStream reply = 
+      new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+    try {
+      reply.writeShort(opStatus);
+      reply.flush();
+    } finally {
+      IOUtils.closeStream(reply);
+    }
+  }
+}

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=685979&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Thu Aug 14 10:58:30 2008
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Server used for receiving/sending a block of data.
+ * This is created to listen for requests from clients or 
+ * other DataNodes.  This small server does not use the 
+ * Hadoop IPC mechanism.
+ */
+class DataXceiverServer implements Runnable, FSConstants {
+  public static final Log LOG = DataNode.LOG;
+  
+  ServerSocket ss;
+  DataNode datanode;
+  // Record all sockets opend for data transfer
+  Map<Socket, Socket> childSockets = Collections.synchronizedMap(
+                                       new HashMap<Socket, Socket>());
+  
+  /**
+   * Maximal number of concurrent xceivers per node.
+   * Enforcing the limit is required in order to avoid data-node
+   * running out of memory.
+   */
+  static final int MAX_XCEIVER_COUNT = 256;
+  int maxXceiverCount = MAX_XCEIVER_COUNT;
+
+  /**
+   * We need an estimate for block size to check if the disk partition has
+   * enough space. For now we set it to be the default block size set
+   * in the server side configuration, which is not ideal because the
+   * default block size should be a client-size configuration. 
+   * A better solution is to include in the header the estimated block size,
+   * i.e. either the actual block size or the default block size.
+   */
+  long estimateBlockSize;
+  
+  
+  DataXceiverServer(ServerSocket ss, Configuration conf, 
+      DataNode datanode) {
+    
+    this.ss = ss;
+    this.datanode = datanode;
+    
+    this.maxXceiverCount = conf.getInt("dfs.datanode.max.xcievers",
+        MAX_XCEIVER_COUNT);
+    
+    this.estimateBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+  }
+
+  /**
+   */
+  public void run() {
+    while (datanode.shouldRun) {
+      try {
+        Socket s = ss.accept();
+        s.setTcpNoDelay(true);
+        new Daemon(datanode.threadGroup, 
+            new DataXceiver(s, datanode, this)).start();
+      } catch (IOException ie) {
+        LOG.warn(datanode.dnRegistration + ":DataXceiveServer: " 
+                                + StringUtils.stringifyException(ie));
+      } catch (Throwable te) {
+        LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:" 
+                                 + StringUtils.stringifyException(te));
+        datanode.shouldRun = false;
+      }
+    }
+    try {
+      ss.close();
+    } catch (IOException ie) {
+      LOG.warn(datanode.dnRegistration + ":DataXceiveServer: " 
+                              + StringUtils.stringifyException(ie));
+    }
+  }
+  
+  void kill() {
+    assert datanode.shouldRun == false :
+      "shoudRun should be set to false before killing";
+    try {
+      this.ss.close();
+    } catch (IOException ie) {
+      LOG.warn(datanode.dnRegistration + ":DataXceiveServer.kill(): " 
+                              + StringUtils.stringifyException(ie));
+    }
+
+    // close all the sockets that were accepted earlier
+    synchronized (childSockets) {
+      for (Iterator<Socket> it = childSockets.values().iterator();
+           it.hasNext();) {
+        Socket thissock = it.next();
+        try {
+          thissock.close();
+        } catch (IOException e) {
+        }
+      }
+    }
+  }
+}

Copied: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (from r685529, hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java)
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?p2=hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java&p1=hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java&r1=685529&r2=685979&rev=685979&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Thu Aug 14 10:58:30 2008
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs;
+package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -27,24 +27,25 @@
 import java.util.List;
 import java.util.Random;
 
+import junit.framework.TestCase;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.BlockTransferThrottler;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
-
-
-import junit.framework.TestCase;
 /**
  * This class tests if block replacement request to data nodes work correctly.
  */
@@ -60,7 +61,7 @@
     final long TOTAL_BYTES =6*bandwidthPerSec; 
     long bytesToSend = TOTAL_BYTES; 
     long start = Util.now();
-    DataNode.Throttler throttler = new DataNode.Throttler(bandwidthPerSec);
+    BlockTransferThrottler throttler = new BlockTransferThrottler(bandwidthPerSec);
     long totalBytes = 0L;
     long bytesSent = 1024*512L; // 0.5MB
     throttler.throttle(bytesSent);

Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
------------------------------------------------------------------------------
    svn:mergeinfo =