You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sh...@apache.org on 2012/04/23 23:37:56 UTC
svn commit: r1329468 [2/2] - in /hadoop/common/branches/branch-0.22/hdfs: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/common/
src/java/org/apache/hadoop/hdfs/server/datanode/ src/...
Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Apr 23 21:37:55 2012
@@ -21,10 +21,13 @@ package org.apache.hadoop.hdfs.server.da
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -49,6 +52,9 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -64,25 +70,27 @@ import org.apache.hadoop.hdfs.HDFSPolicy
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -92,6 +100,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -101,7 +110,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
@@ -109,27 +117,24 @@ import org.apache.hadoop.ipc.RemoteExcep
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.mortbay.util.ajax.JSON;
-import java.lang.management.ManagementFactory;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
/**********************************************************
* DataNode is a class (and program) that stores a set of
* blocks for a DFS deployment. A single deployment can
@@ -230,6 +235,8 @@ public class DataNode extends Configured
BlockTokenSecretManager blockTokenSecretManager;
boolean isBlockTokenInitialized = false;
+ final String userWithLocalPathAccess;
+
public DataBlockScanner blockScanner = null;
public Daemon blockScannerThread = null;
@@ -276,6 +283,9 @@ public class DataNode extends Configured
DataNode.setDataNode(this);
+ this.userWithLocalPathAccess = conf
+ .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
+
try {
startDataNode(conf, dataDirs, namenode, resources);
} catch (IOException ie) {
@@ -1753,6 +1763,67 @@ public class DataNode extends Configured
+ ": " + protocol);
}
+ /** Ensure the authentication method is kerberos */
+ private void checkKerberosAuthMethod(String msg) throws IOException {
+ // User invoking the call must be same as the datanode user
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+ if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() !=
+ AuthenticationMethod.KERBEROS) {
+ throw new AccessControlException("Error in "+msg+". Only "
+ + "kerberos based authentication is allowed.");
+ }
+ }
+
+ private void checkBlockLocalPathAccess() throws IOException {
+ checkKerberosAuthMethod("getBlockLocalPathInfo()");
+ String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+ if (!currentUser.equals(this.userWithLocalPathAccess)) {
+ throw new AccessControlException(
+ "Can't continue with getBlockLocalPathInfo() "
+ + "authorization. The user " + currentUser
+ + " is not allowed to call getBlockLocalPathInfo");
+ }
+ }
+
+ @Override
+ public BlockLocalPathInfo getBlockLocalPathInfo(Block block,
+ Token<BlockTokenIdentifier> token) throws IOException {
+ checkBlockLocalPathAccess();
+ checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
+ BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
+ if (LOG.isDebugEnabled()) {
+ if (info != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("getBlockLocalPathInfo successful block=" + block
+ + " blockfile " + info.getBlockPath() + " metafile "
+ + info.getMetaPath());
+ }
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("getBlockLocalPathInfo for block=" + block
+ + " returning null");
+ }
+ }
+ }
+ return info;
+ }
+
+ private void checkBlockToken(Block block, Token<BlockTokenIdentifier> token,
+ AccessMode accessMode) throws IOException {
+ if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+ BlockTokenIdentifier id = new BlockTokenIdentifier();
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ id.readFields(in);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got: " + id.toString());
+ }
+ blockTokenSecretManager.checkAccess(id, null, block, accessMode);
+ }
+ }
+
/** A convenient class used in block recovery */
static class BlockRecord {
final DatanodeID id;
Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Mon Apr 23 21:37:55 2012
@@ -47,23 +47,24 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.StringUtils;
/**************************************************
* FSDataset manages a set of data blocks. Each block
@@ -953,6 +954,17 @@ public class FSDataset implements FSCons
}
@Override // FSDatasetInterface
+ public BlockLocalPathInfo getBlockLocalPathInfo(Block block)
+ throws IOException {
+ File datafile = getBlockFile(block);
+ File metafile = getMetaFile(datafile, block);
+ BlockLocalPathInfo info = new BlockLocalPathInfo(block,
+ datafile.getAbsolutePath(), metafile.getAbsolutePath());
+ return info;
+ }
+
+
+ @Override // FSDatasetInterface
public synchronized InputStream getBlockInputStream(Block b) throws IOException {
return new FileInputStream(getBlockFile(b));
}
Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Mon Apr 23 21:37:55 2012
@@ -25,11 +25,12 @@ import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
-import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -343,4 +344,9 @@ public interface FSDatasetInterface exte
Block oldBlock,
long recoveryId,
long newLength) throws IOException;
+
+ /**
+ * Get {@link BlockLocalPathInfo} for the given block.
+ **/
+ public BlockLocalPathInfo getBlockLocalPathInfo(Block b) throws IOException;
}
Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Mon Apr 23 21:37:55 2012
@@ -38,6 +38,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -502,8 +503,8 @@ public class NamenodeFsck {
s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
- String file = BlockReader.getFileName(targetAddr, block.getBlockId());
- blockReader = BlockReader.newBlockReader(s, file, block, lblock
+ String file = RemoteBlockReader.getFileName(targetAddr, block.getBlockId());
+ blockReader = RemoteBlockReader.newBlockReader(s, file, block, lblock
.getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
} catch (IOException ex) {
Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests Mon Apr 23 21:37:55 2012
@@ -11,6 +11,7 @@
**/TestDatanodeDescriptor.java
**/TestEditLog.java
**/TestFileLimit.java
+**/TestShortCircuitLocalRead.java
**/TestHeartbeatHandling.java
**/TestHost2NodesMap.java
**/TestNamenodeCapacityReport.java
Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Mon Apr 23 21:37:55 2012
@@ -132,7 +132,7 @@ public class BlockReaderTestUtil {
/**
* Get a BlockReader for the given block.
*/
- public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
+ public RemoteBlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
throws IOException {
InetSocketAddress targetAddr = null;
Socket sock = null;
@@ -143,7 +143,7 @@ public class BlockReaderTestUtil {
sock.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
- return BlockReader.newBlockReader(
+ return RemoteBlockReader.newBlockReader(
sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
testBlock.getBlockToken(),
offset, lenToRead,
Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Mon Apr 23 21:37:55 2012
@@ -54,7 +54,7 @@ public class TestClientBlockVerification
*/
@Test
public void testBlockVerification() throws Exception {
- BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+ RemoteBlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
reader.close();
@@ -65,7 +65,7 @@ public class TestClientBlockVerification
*/
@Test
public void testIncompleteRead() throws Exception {
- BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+ RemoteBlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
// We asked the blockreader for the whole file, and only read
@@ -82,7 +82,7 @@ public class TestClientBlockVerification
@Test
public void testCompletePartialRead() throws Exception {
// Ask for half the file
- BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
+ RemoteBlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
// And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
@@ -101,7 +101,7 @@ public class TestClientBlockVerification
for (int length : lengths) {
DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
" len=" + length);
- BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
+ RemoteBlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
reader.close();
Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java Mon Apr 23 21:37:55 2012
@@ -71,13 +71,13 @@ public class TestConnCache {
* It verifies that all invocation to DFSInputStream.getBlockReader()
* use the same socket.
*/
- private class MockGetBlockReader implements Answer<BlockReader> {
- public BlockReader reader = null;
+ private class MockGetBlockReader implements Answer<RemoteBlockReader> {
+ public RemoteBlockReader reader = null;
private Socket sock = null;
- public BlockReader answer(InvocationOnMock invocation) throws Throwable {
- BlockReader prevReader = reader;
- reader = (BlockReader) invocation.callRealMethod();
+ public RemoteBlockReader answer(InvocationOnMock invocation) throws Throwable {
+ RemoteBlockReader prevReader = reader;
+ reader = (RemoteBlockReader) invocation.callRealMethod();
if (sock == null) {
sock = reader.dnSock;
} else if (prevReader != null && prevReader.hasSentStatusCode()) {
Added: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1329468&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (added)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Mon Apr 23 21:37:55 2012
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Test for short circuit read functionality using {@link BlockReaderLocal}.
+ * When a block is being read by a client is on the local datanode, instead of
+ * using {@link DataTransferProtocol} and connect to datanode, the short circuit
+ * read allows reading the file directly from the files on the local file
+ * system.
+ */
+public class TestShortCircuitLocalRead {
+ static final String DIR = MiniDFSCluster.getBaseDirectory() + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+
+ static final long SEED = 0xDEADBEEFL;
+ static final int BLOCKSIZE = 5120;
+ boolean simulatedStorage = false;
+
+ // creates a file but does not close it
+ static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+ throws IOException {
+ FSDataOutputStream stm = fileSys.create(name, true,
+ fileSys.getConf().getInt("io.file.buffer.size", 4096),
+ (short)repl, (long)BLOCKSIZE);
+ return stm;
+ }
+
+ static private void checkData(byte[] actual, int from, byte[] expected,
+ String message) {
+ checkData(actual, from, expected, actual.length, message);
+ }
+
+ static private void checkData(byte[] actual, int from, byte[] expected,
+ int len, String message) {
+ for (int idx = 0; idx < len; idx++) {
+ if (expected[from + idx] != actual[idx]) {
+ Assert.fail(message + " byte " + (from + idx) + " differs. expected "
+ + expected[from + idx] + " actual " + actual[idx]);
+ }
+ }
+ }
+
+ static void checkFileContent(FileSystem fs, Path name, byte[] expected,
+ int readOffset) throws IOException {
+ FSDataInputStream stm = fs.open(name);
+ byte[] actual = new byte[expected.length-readOffset];
+ stm.readFully(readOffset, actual);
+ checkData(actual, readOffset, expected, "Read 2");
+ stm.close();
+ // Now read using a different API.
+ actual = new byte[expected.length-readOffset];
+ stm = fs.open(name);
+ long skipped = stm.skip(readOffset);
+ Assert.assertEquals(skipped, readOffset);
+ //Read a small number of bytes first.
+ int nread = stm.read(actual, 0, 3);
+ nread += stm.read(actual, nread, 2);
+ //Read across chunk boundary
+ nread += stm.read(actual, nread, 517);
+ checkData(actual, readOffset, expected, nread, "A few bytes");
+ //Now read rest of it
+ while (nread < actual.length) {
+ int nbytes = stm.read(actual, nread, actual.length - nread);
+ if (nbytes < 0) {
+ throw new EOFException("End of file reached before reading fully.");
+ }
+ nread += nbytes;
+ }
+ checkData(actual, readOffset, expected, "Read 3");
+ stm.close();
+ }
+
+ /**
+ * Test that file data can be read by reading the block file
+ * directly from the local store.
+ */
+ public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
+ int readOffset) throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ ignoreChecksum);
+ conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ if (simulatedStorage) {
+ conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+ }
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
+ numDataNodes(8).
+ format(true).
+ build();
+ FileSystem fs = cluster.getFileSystem();
+ try {
+ // check that / exists
+ Path path = new Path("/");
+ assertTrue("/ should be a directory",
+ fs.getFileStatus(path).isDirectory() == true);
+
+ byte[] fileData = AppendTestUtil.randomBytes(SEED, size);
+ // create a new file in home directory. Do not close it.
+ Path file1 = new Path("filelocal.dat");
+ FSDataOutputStream stm = createFile(fs, file1, 1);
+
+ // write to file
+ stm.write(fileData);
+ stm.close();
+ checkFileContent(fs, file1, fileData, readOffset);
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testFileLocalReadNoChecksum() throws IOException {
+ doTestShortCircuitRead(true, 3*BLOCKSIZE+100, 0);
+ }
+
+ @Test
+ public void testFileLocalReadChecksum() throws IOException {
+ doTestShortCircuitRead(false, 3*BLOCKSIZE+100, 0);
+ }
+
+ @Test
+ public void testSmallFileLocalRead() throws IOException {
+ doTestShortCircuitRead(false, 13, 0);
+ doTestShortCircuitRead(false, 13, 5);
+ doTestShortCircuitRead(true, 13, 0);
+ doTestShortCircuitRead(true, 13, 5);
+ }
+
+ @Test
+ public void testReadFromAnOffset() throws IOException {
+ doTestShortCircuitRead(false, 3*BLOCKSIZE+100, 777);
+ doTestShortCircuitRead(true, 3*BLOCKSIZE+100, 777);
+ }
+
+ @Test
+ public void testLongFile() throws IOException {
+ doTestShortCircuitRead(false, 10*BLOCKSIZE+100, 777);
+ doTestShortCircuitRead(true, 10*BLOCKSIZE+100, 777);
+ }
+
+ @Test
+ public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+ final Configuration conf = new Configuration();
+ conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, "alloweduser");
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+ cluster.waitActive();
+ final DataNode dn = cluster.getDataNodes().get(0);
+ FileSystem fs = cluster.getFileSystem();
+ try {
+ DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
+ UserGroupInformation aUgi = UserGroupInformation
+ .createRemoteUser("alloweduser");
+ LocatedBlocks lb = cluster.getNameNode().getBlockLocations("/tmp/x", 0,
+ 16);
+ // Create a new block object, because the block inside LocatedBlock at
+ // namenode is of type BlockInfo.
+ Block blk = new Block(lb.get(0).getBlock());
+ Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
+ final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
+ ClientDatanodeProtocol proxy = aUgi
+ .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+ @Override
+ public ClientDatanodeProtocol run() throws Exception {
+ return DFSClient.createClientDatanodeProtocolProxy(
+ dnInfo, conf, 60000);
+ }
+ });
+
+ //This should succeed
+ BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
+ Assert.assertEquals(dn.data.getBlockLocalPathInfo(blk).getBlockPath(),
+ blpi.getBlockPath());
+ RPC.stopProxy(proxy);
+
+ // Now try with a not allowed user.
+ UserGroupInformation bUgi = UserGroupInformation
+ .createRemoteUser("notalloweduser");
+ proxy = bUgi
+ .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+ @Override
+ public ClientDatanodeProtocol run() throws Exception {
+ return DFSClient.createClientDatanodeProtocolProxy(
+ dnInfo, conf, 60000);
+ }
+ });
+ try {
+ proxy.getBlockLocalPathInfo(blk, token);
+ Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+ + " is not allowed to call getBlockLocalPathInfo");
+ } catch (IOException ex) {
+ Assert.assertTrue(ex.getMessage().contains(
+ "not allowed to call getBlockLocalPathInfo"));
+ } finally {
+ RPC.stopProxy(proxy);
+ }
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test to run benchmarks between shortcircuit read vs regular read with
+ * specified number of threads simultaneously reading.
+ * <br>
+ * Run this using the following command:
+ * bin/hadoop --config confdir \
+ * org.apache.hadoop.hdfs.TestShortCircuitLocalRead \
+ * <shortcircuit on?> <checsum on?> <Number of threads>
+ */
+ public static void main(String[] args) throws Exception {
+ ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.INFO);
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.INFO);
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.INFO);
+
+ if (args.length != 3) {
+ System.out.println("Usage: test shortcircuit checksum threadCount");
+ System.exit(1);
+ }
+ boolean shortcircuit = Boolean.valueOf(args[0]);
+ boolean checksum = Boolean.valueOf(args[1]);
+ int threadCount = Integer.valueOf(args[2]);
+
+ // Setup create a file
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ checksum);
+
+ //Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
+ int fileSize = 1000 * BLOCKSIZE + 100; // File with 1000 blocks
+ final byte [] dataToWrite = AppendTestUtil.randomBytes(SEED, fileSize);
+
+ // create a new file in home directory. Do not close it.
+ final Path file1 = new Path("filelocal.dat");
+ final FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream stm = createFile(fs, file1, 1);
+
+ stm.write(dataToWrite);
+ stm.close();
+
+ long start = System.currentTimeMillis();
+ final int iteration = 20;
+ Thread[] threads = new Thread[threadCount];
+ for (int i = 0; i < threadCount; i++) {
+ threads[i] = new Thread() {
+ public void run() {
+ for (int i = 0; i < iteration; i++) {
+ try {
+ checkFileContent(fs, file1, dataToWrite, 0);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+ }
+ for (int i = 0; i < threadCount; i++) {
+ threads[i].start();
+ }
+ for (int i = 0; i < threadCount; i++) {
+ threads[i].join();
+ }
+ long end = System.currentTimeMillis();
+ System.out.println("Iteration " + iteration + " took " + (end - start));
+ fs.delete(file1, false);
+ }
+}
Propchange: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Mon Apr 23 21:37:55 2012
@@ -34,11 +34,12 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
-import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -836,4 +837,9 @@ public class SimulatedFSDataset impleme
public long getReplicaVisibleLength(Block block) throws IOException {
return block.getNumBytes();
}
+
+ @Override
+ public BlockLocalPathInfo getBlockLocalPathInfo(Block blk) throws IOException {
+ throw new IOException("getBlockLocalPathInfo not supported.");
+ }
}
Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Mon Apr 23 21:37:55 2012
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -30,22 +33,21 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetUtils;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.*;
/**
* Fine-grain testing of block files and locations after volume failure.
@@ -263,9 +265,9 @@ public class TestDataNodeVolumeFailure {
s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
- String file = BlockReader.getFileName(targetAddr, block.getBlockId());
+ String file = RemoteBlockReader.getFileName(targetAddr, block.getBlockId());
blockReader =
- BlockReader.newBlockReader(s, file, block, lblock
+ RemoteBlockReader.newBlockReader(s, file, block, lblock
.getBlockToken(), 0, -1, 4096);
// nothing - if it fails - it will throw and exception
Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java Mon Apr 23 21:37:55 2012
@@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.server.da
import java.util.List;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -58,7 +58,7 @@ public class TestDataXceiver {
@Test
public void testCompletePartialRead() throws Exception {
// Ask for half the file
- BlockReader reader = util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2);
+ RemoteBlockReader reader = util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2);
DataNode dn = util.getDataNode(testBlock);
DataBlockScanner scanner = spy(dn.blockScanner);
dn.blockScanner = scanner;
Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java Mon Apr 23 21:37:55 2012
@@ -24,29 +24,33 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Random;
+import junit.framework.TestCase;
+
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.token.block.*;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.token.*;
+import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Level;
-import junit.framework.TestCase;
-
public class TestBlockTokenWithDFS extends TestCase {
private static final int BLOCK_SIZE = 1024;
@@ -130,8 +134,8 @@ public class TestBlockTokenWithDFS exten
s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
- String file = BlockReader.getFileName(targetAddr, block.getBlockId());
- blockReader = BlockReader.newBlockReader(s, file, block,
+ String file = RemoteBlockReader.getFileName(targetAddr, block.getBlockId());
+ blockReader = RemoteBlockReader.newBlockReader(s, file, block,
lblock.getBlockToken(), 0, -1,
conf.getInt("io.file.buffer.size", 4096));