You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sh...@apache.org on 2012/04/23 23:37:56 UTC

svn commit: r1329468 [2/2] - in /hadoop/common/branches/branch-0.22/hdfs: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/...

Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Apr 23 21:37:55 2012
@@ -21,10 +21,13 @@ package org.apache.hadoop.hdfs.server.da
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -49,6 +52,9 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -64,25 +70,27 @@ import org.apache.hadoop.hdfs.HDFSPolicy
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -92,6 +100,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -101,7 +110,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
@@ -109,27 +117,24 @@ import org.apache.hadoop.ipc.RemoteExcep
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.mortbay.util.ajax.JSON;
 
-import java.lang.management.ManagementFactory;  
-
-import javax.management.MBeanServer; 
-import javax.management.ObjectName;
-
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
  * blocks for a DFS deployment.  A single deployment can
@@ -230,6 +235,8 @@ public class DataNode extends Configured
   BlockTokenSecretManager blockTokenSecretManager;
   boolean isBlockTokenInitialized = false;
   
+  final String userWithLocalPathAccess;
+
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
   
@@ -276,6 +283,9 @@ public class DataNode extends Configured
 
     DataNode.setDataNode(this);
     
+    this.userWithLocalPathAccess = conf
+    .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
+
     try {
       startDataNode(conf, dataDirs, namenode, resources);
     } catch (IOException ie) {
@@ -1753,6 +1763,67 @@ public class DataNode extends Configured
         + ": " + protocol);
   }
 
+  /** Ensure the authentication method is kerberos */
+  private void checkKerberosAuthMethod(String msg) throws IOException {
+    // User invoking the call must be same as the datanode user
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() != 
+      AuthenticationMethod.KERBEROS) {
+      throw new AccessControlException("Error in "+msg+". Only "
+          + "kerberos based authentication is allowed.");
+    }
+  }
+
+  private void checkBlockLocalPathAccess() throws IOException {
+    checkKerberosAuthMethod("getBlockLocalPathInfo()");
+    String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+    if (!currentUser.equals(this.userWithLocalPathAccess)) {
+      throw new AccessControlException(
+          "Can't continue with getBlockLocalPathInfo() "
+          + "authorization. The user " + currentUser
+          + " is not allowed to call getBlockLocalPathInfo");
+    }
+  }
+
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block block,
+      Token<BlockTokenIdentifier> token) throws IOException {
+    checkBlockLocalPathAccess();
+    checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
+    BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
+    if (LOG.isDebugEnabled()) {
+      if (info != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("getBlockLocalPathInfo successful block=" + block
+              + " blockfile " + info.getBlockPath() + " metafile "
+              + info.getMetaPath());
+        }
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("getBlockLocalPathInfo for block=" + block
+              + " returning null");
+        }
+      }
+    }
+    return info;
+  }
+
+  private void checkBlockToken(Block block, Token<BlockTokenIdentifier> token,
+      AccessMode accessMode) throws IOException {
+    if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+      BlockTokenIdentifier id = new BlockTokenIdentifier();
+      ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+      DataInputStream in = new DataInputStream(buf);
+      id.readFields(in);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got: " + id.toString());
+      }
+      blockTokenSecretManager.checkAccess(id, null, block, accessMode);
+    }
+  }
+
   /** A convenient class used in block recovery */
   static class BlockRecord { 
     final DatanodeID id;

Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Mon Apr 23 21:37:55 2012
@@ -47,23 +47,24 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics.util.MBeanUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.StringUtils;
 
 /**************************************************
  * FSDataset manages a set of data blocks.  Each block
@@ -953,6 +954,17 @@ public class FSDataset implements FSCons
   }
   
   @Override // FSDatasetInterface
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block block)
+  throws IOException {
+    File datafile = getBlockFile(block);
+    File metafile = getMetaFile(datafile, block);
+    BlockLocalPathInfo info = new BlockLocalPathInfo(block,
+        datafile.getAbsolutePath(), metafile.getAbsolutePath());
+    return info;
+  }
+
+
+  @Override // FSDatasetInterface
   public synchronized InputStream getBlockInputStream(Block b) throws IOException {
     return new FileInputStream(getBlockFile(b));
   }

Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Mon Apr 23 21:37:55 2012
@@ -25,11 +25,12 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
-import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
@@ -343,4 +344,9 @@ public interface FSDatasetInterface exte
                                           Block oldBlock,
                                           long recoveryId,
                                           long newLength) throws IOException;
+
+  /**
+   * Get {@link BlockLocalPathInfo} for the given block.
+   **/
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block b) throws IOException;
 }

Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Mon Apr 23 21:37:55 2012
@@ -38,6 +38,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -502,8 +503,8 @@ public class NamenodeFsck {
         s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
         s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
         
-        String file = BlockReader.getFileName(targetAddr, block.getBlockId());
-        blockReader = BlockReader.newBlockReader(s, file, block, lblock
+        String file = RemoteBlockReader.getFileName(targetAddr, block.getBlockId());
+        blockReader = RemoteBlockReader.newBlockReader(s, file, block, lblock
             .getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
         
       }  catch (IOException ex) {

Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests Mon Apr 23 21:37:55 2012
@@ -11,6 +11,7 @@
 **/TestDatanodeDescriptor.java
 **/TestEditLog.java
 **/TestFileLimit.java
+**/TestShortCircuitLocalRead.java
 **/TestHeartbeatHandling.java
 **/TestHost2NodesMap.java
 **/TestNamenodeCapacityReport.java

Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Mon Apr 23 21:37:55 2012
@@ -132,7 +132,7 @@ public class BlockReaderTestUtil {
   /**
    * Get a BlockReader for the given block.
    */
-  public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
+  public RemoteBlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
       throws IOException {
     InetSocketAddress targetAddr = null;
     Socket sock = null;
@@ -143,7 +143,7 @@ public class BlockReaderTestUtil {
     sock.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
     sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
-    return BlockReader.newBlockReader(
+    return RemoteBlockReader.newBlockReader(
       sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
       testBlock.getBlockToken(), 
       offset, lenToRead,

Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Mon Apr 23 21:37:55 2012
@@ -54,7 +54,7 @@ public class TestClientBlockVerification
    */
   @Test
   public void testBlockVerification() throws Exception {
-    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+    RemoteBlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
     reader.close();
@@ -65,7 +65,7 @@ public class TestClientBlockVerification
    */
   @Test
   public void testIncompleteRead() throws Exception {
-    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+    RemoteBlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
 
     // We asked the blockreader for the whole file, and only read
@@ -82,7 +82,7 @@ public class TestClientBlockVerification
   @Test
   public void testCompletePartialRead() throws Exception {
     // Ask for half the file
-    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
+    RemoteBlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
     verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
@@ -101,7 +101,7 @@ public class TestClientBlockVerification
       for (int length : lengths) {
         DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
                            " len=" + length);
-        BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
+        RemoteBlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
         verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
         reader.close();

Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java Mon Apr 23 21:37:55 2012
@@ -71,13 +71,13 @@ public class TestConnCache {
    * It verifies that all invocation to DFSInputStream.getBlockReader()
    * use the same socket.
    */
-  private class MockGetBlockReader implements Answer<BlockReader> {
-    public BlockReader reader = null;
+  private class MockGetBlockReader implements Answer<RemoteBlockReader> {
+    public RemoteBlockReader reader = null;
     private Socket sock = null;
 
-    public BlockReader answer(InvocationOnMock invocation) throws Throwable {
-      BlockReader prevReader = reader;
-      reader = (BlockReader) invocation.callRealMethod();
+    public RemoteBlockReader answer(InvocationOnMock invocation) throws Throwable {
+      RemoteBlockReader prevReader = reader;
+      reader = (RemoteBlockReader) invocation.callRealMethod();
       if (sock == null) {
         sock = reader.dnSock;
       } else if (prevReader != null && prevReader.hasSentStatusCode()) {

Added: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1329468&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (added)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Mon Apr 23 21:37:55 2012
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Test for short circuit read functionality using {@link BlockReaderLocal}.
+ * When a block is being read by a client is on the local datanode, instead of
+ * using {@link DataTransferProtocol} and connect to datanode, the short circuit
+ * read allows reading the file directly from the files on the local file
+ * system.
+ */
+public class TestShortCircuitLocalRead {
+  static final String DIR = MiniDFSCluster.getBaseDirectory()  + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+
+  static final long SEED = 0xDEADBEEFL;
+  static final int BLOCKSIZE = 5120;
+  boolean simulatedStorage = false;
+
+  // creates a file but does not close it
+  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+  throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true,
+        fileSys.getConf().getInt("io.file.buffer.size", 4096),
+        (short)repl, (long)BLOCKSIZE);
+    return stm;
+  }
+
+  static private void checkData(byte[] actual, int from, byte[] expected,
+      String message) {
+    checkData(actual, from, expected, actual.length, message);
+  }
+
+  static private void checkData(byte[] actual, int from, byte[] expected,
+      int len, String message) {
+    for (int idx = 0; idx < len; idx++) {
+      if (expected[from + idx] != actual[idx]) {
+        Assert.fail(message + " byte " + (from + idx) + " differs. expected "
+            + expected[from + idx] + " actual " + actual[idx]);
+      }
+    }
+  }
+
+  static void checkFileContent(FileSystem fs, Path name, byte[] expected,
+      int readOffset) throws IOException {
+    FSDataInputStream stm = fs.open(name);
+    byte[] actual = new byte[expected.length-readOffset];
+    stm.readFully(readOffset, actual);
+    checkData(actual, readOffset, expected, "Read 2");
+    stm.close();
+    // Now read using a different API.
+    actual = new byte[expected.length-readOffset];
+    stm = fs.open(name);
+    long skipped = stm.skip(readOffset);
+    Assert.assertEquals(skipped, readOffset);
+    //Read a small number of bytes first.
+    int nread = stm.read(actual, 0, 3);
+    nread += stm.read(actual, nread, 2);
+    //Read across chunk boundary
+    nread += stm.read(actual, nread, 517);
+    checkData(actual, readOffset, expected, nread, "A few bytes");
+    //Now read rest of it
+    while (nread < actual.length) {
+      int nbytes = stm.read(actual, nread, actual.length - nread);
+      if (nbytes < 0) {
+        throw new EOFException("End of file reached before reading fully.");
+      }
+      nread += nbytes;
+    }
+    checkData(actual, readOffset, expected, "Read 3");
+    stm.close();
+  }
+
+  /**
+   * Test that file data can be read by reading the block file
+   * directly from the local store.
+   */
+  public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
+      int readOffset) throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        ignoreChecksum);
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
+    numDataNodes(8).
+    format(true).
+    build();
+    FileSystem fs = cluster.getFileSystem();
+    try {
+      // check that / exists
+      Path path = new Path("/"); 
+      assertTrue("/ should be a directory", 
+          fs.getFileStatus(path).isDirectory() == true);
+
+      byte[] fileData = AppendTestUtil.randomBytes(SEED, size);
+      // create a new file in home directory. Do not close it.
+      Path file1 = new Path("filelocal.dat");
+      FSDataOutputStream stm = createFile(fs, file1, 1);
+
+      // write to file
+      stm.write(fileData);
+      stm.close();
+      checkFileContent(fs, file1, fileData, readOffset);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFileLocalReadNoChecksum() throws IOException {
+    doTestShortCircuitRead(true, 3*BLOCKSIZE+100, 0);
+  }
+
+  @Test
+  public void testFileLocalReadChecksum() throws IOException {
+    doTestShortCircuitRead(false, 3*BLOCKSIZE+100, 0);
+  }
+
+  @Test
+  public void testSmallFileLocalRead() throws IOException {
+    doTestShortCircuitRead(false, 13, 0);
+    doTestShortCircuitRead(false, 13, 5);
+    doTestShortCircuitRead(true, 13, 0);
+    doTestShortCircuitRead(true, 13, 5);
+  }
+
+  @Test
+  public void testReadFromAnOffset() throws IOException {
+    doTestShortCircuitRead(false, 3*BLOCKSIZE+100, 777);
+    doTestShortCircuitRead(true, 3*BLOCKSIZE+100, 777);
+  }
+
+  @Test
+  public void testLongFile() throws IOException {
+    doTestShortCircuitRead(false, 10*BLOCKSIZE+100, 777);
+    doTestShortCircuitRead(true, 10*BLOCKSIZE+100, 777);
+  }
+
+  @Test
+  public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+    final Configuration conf = new Configuration();
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, "alloweduser");
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    cluster.waitActive();
+    final DataNode dn = cluster.getDataNodes().get(0);
+    FileSystem fs = cluster.getFileSystem();
+    try {
+      DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
+      UserGroupInformation aUgi = UserGroupInformation
+      .createRemoteUser("alloweduser");
+      LocatedBlocks lb = cluster.getNameNode().getBlockLocations("/tmp/x", 0,
+          16);
+      // Create a new block object, because the block inside LocatedBlock at
+      // namenode is of type BlockInfo.
+      Block blk = new Block(lb.get(0).getBlock());
+      Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
+      final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
+      ClientDatanodeProtocol proxy = aUgi
+      .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+        @Override
+        public ClientDatanodeProtocol run() throws Exception {
+          return DFSClient.createClientDatanodeProtocolProxy(
+              dnInfo, conf, 60000);
+        }
+      });
+
+      //This should succeed
+      BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
+      Assert.assertEquals(dn.data.getBlockLocalPathInfo(blk).getBlockPath(),
+          blpi.getBlockPath());
+      RPC.stopProxy(proxy);
+
+      // Now try with a not allowed user.
+      UserGroupInformation bUgi = UserGroupInformation
+      .createRemoteUser("notalloweduser");
+      proxy = bUgi
+      .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+        @Override
+        public ClientDatanodeProtocol run() throws Exception {
+          return DFSClient.createClientDatanodeProtocolProxy(
+              dnInfo, conf, 60000);
+        }
+      });
+      try {
+        proxy.getBlockLocalPathInfo(blk, token);
+        Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+            + " is not allowed to call getBlockLocalPathInfo");
+      } catch (IOException ex) {
+        Assert.assertTrue(ex.getMessage().contains(
+        "not allowed to call getBlockLocalPathInfo"));
+      } finally {
+        RPC.stopProxy(proxy);
+      }
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test to run benchmarks between shortcircuit read vs regular read with
+   * specified number of threads simultaneously reading.
+   * <br>
+   * Run this using the following command:
+   * bin/hadoop --config confdir \
+   * org.apache.hadoop.hdfs.TestShortCircuitLocalRead \
+   * <shortcircuit on?> <checsum on?> <Number of threads>
+   */
+  public static void main(String[] args) throws Exception {
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.INFO);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.INFO);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.INFO);
+
+    if (args.length != 3) {
+      System.out.println("Usage: test shortcircuit checksum threadCount");
+      System.exit(1);
+    }
+    boolean shortcircuit = Boolean.valueOf(args[0]);
+    boolean checksum = Boolean.valueOf(args[1]);
+    int threadCount = Integer.valueOf(args[2]);
+
+    // Setup create a file
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        checksum);
+
+    //Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
+    int fileSize = 1000 * BLOCKSIZE + 100; // File with 1000 blocks
+    final byte [] dataToWrite = AppendTestUtil.randomBytes(SEED, fileSize);
+
+    // create a new file in home directory. Do not close it.
+    final Path file1 = new Path("filelocal.dat");
+    final FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream stm = createFile(fs, file1, 1);
+
+    stm.write(dataToWrite);
+    stm.close();
+
+    long start = System.currentTimeMillis();
+    final int iteration = 20;
+    Thread[] threads = new Thread[threadCount];
+    for (int i = 0; i < threadCount; i++) {
+      threads[i] = new Thread() {
+        public void run() {
+          for (int i = 0; i < iteration; i++) {
+            try {
+              checkFileContent(fs, file1, dataToWrite, 0);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+      };
+    }
+    for (int i = 0; i < threadCount; i++) {
+      threads[i].start();
+    }
+    for (int i = 0; i < threadCount; i++) {
+      threads[i].join();
+    }
+    long end = System.currentTimeMillis();
+    System.out.println("Iteration " + iteration + " took " + (end - start));
+    fs.delete(file1, false);
+  }
+}

Propchange: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Mon Apr 23 21:37:55 2012
@@ -34,11 +34,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
-import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.metrics.util.MBeanUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -836,4 +837,9 @@ public class SimulatedFSDataset  impleme
   public long getReplicaVisibleLength(Block block) throws IOException {
     return block.getNumBytes();
   }
+
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block blk) throws IOException {
+    throw new IOException("getBlockLocalPathInfo not supported.");
+  }
 }

Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Mon Apr 23 21:37:55 2012
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -30,22 +33,21 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.net.NetUtils;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 /**
  * Fine-grain testing of block files and locations after volume failure.
@@ -263,9 +265,9 @@ public class TestDataNodeVolumeFailure {
     s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
-    String file = BlockReader.getFileName(targetAddr, block.getBlockId());
+    String file = RemoteBlockReader.getFileName(targetAddr, block.getBlockId());
     blockReader = 
-      BlockReader.newBlockReader(s, file, block, lblock
+      RemoteBlockReader.newBlockReader(s, file, block, lblock
         .getBlockToken(), 0, -1, 4096);
 
     // nothing - if it fails - it will throw and exception

Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java Mon Apr 23 21:37:55 2012
@@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.server.da
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -58,7 +58,7 @@ public class TestDataXceiver {
   @Test
   public void testCompletePartialRead() throws Exception {
     // Ask for half the file
-    BlockReader reader = util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2);
+    RemoteBlockReader reader = util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2);
     DataNode dn = util.getDataNode(testBlock);
     DataBlockScanner scanner = spy(dn.blockScanner);
     dn.blockScanner = scanner;

Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java Mon Apr 23 21:37:55 2012
@@ -24,29 +24,33 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 
+import junit.framework.TestCase;
+
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.token.block.*;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.token.*;
+import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.Level;
 
-import junit.framework.TestCase;
-
 public class TestBlockTokenWithDFS extends TestCase {
 
   private static final int BLOCK_SIZE = 1024;
@@ -130,8 +134,8 @@ public class TestBlockTokenWithDFS exten
       s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
       s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
-      String file = BlockReader.getFileName(targetAddr, block.getBlockId());
-      blockReader = BlockReader.newBlockReader(s, file, block, 
+      String file = RemoteBlockReader.getFileName(targetAddr, block.getBlockId());
+      blockReader = RemoteBlockReader.newBlockReader(s, file, block, 
           lblock.getBlockToken(), 0, -1, 
           conf.getInt("io.file.buffer.size", 4096));