You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/04/14 01:06:00 UTC
svn commit: r1467713 [4/4] - in
/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./
dev-support/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/net/
src/main/java/org/apache/hadoop/hdfs/proto...
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Sat Apr 13 23:05:54 2013
@@ -21,10 +21,13 @@ import static org.junit.Assert.assertTru
import static org.junit.Assert.assertFalse;
import java.io.EOFException;
+import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -32,7 +35,6 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -40,15 +42,21 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.*;
/**
* Test for short circuit read functionality using {@link BlockReaderLocal}.
@@ -58,9 +66,24 @@ import org.junit.Test;
* system.
*/
public class TestShortCircuitLocalRead {
+ private static TemporarySocketDirectory sockDir;
- static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+ @BeforeClass
+ public static void init() {
+ sockDir = new TemporarySocketDirectory();
+ DomainSocket.disableBindPathValidation();
+ }
+
+ @AfterClass
+ public static void shutdown() throws IOException {
+ sockDir.close();
+ }
+ @Before
+ public void before() {
+ Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+ }
+
static final long seed = 0xDEADBEEFL;
static final int blockSize = 5120;
boolean simulatedStorage = false;
@@ -84,7 +107,9 @@ public class TestShortCircuitLocalRead {
for (int idx = 0; idx < len; idx++) {
if (expected[from + idx] != actual[idx]) {
Assert.fail(message + " byte " + (from + idx) + " differs. expected "
- + expected[from + idx] + " actual " + actual[idx]);
+ + expected[from + idx] + " actual " + actual[idx] +
+ "\nexpected: " + StringUtils.byteToHexString(expected, from, from + len) +
+ "\nactual: " + StringUtils.byteToHexString(actual, 0, len));
}
}
}
@@ -96,11 +121,13 @@ public class TestShortCircuitLocalRead {
/** Check file content, reading as user {@code readingUser} */
static void checkFileContent(URI uri, Path name, byte[] expected,
int readOffset, String readingUser, Configuration conf,
- boolean shortCircuitFails)
+ boolean legacyShortCircuitFails)
throws IOException, InterruptedException {
// Ensure short circuit is enabled
DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
- assertTrue(fs.getClient().getShortCircuitLocalReads());
+ if (legacyShortCircuitFails) {
+ assertTrue(fs.getClient().useLegacyBlockReaderLocal());
+ }
FSDataInputStream stm = fs.open(name);
byte[] actual = new byte[expected.length-readOffset];
@@ -127,9 +154,8 @@ public class TestShortCircuitLocalRead {
}
checkData(actual, readOffset, expected, "Read 3");
- if (shortCircuitFails) {
- // short circuit should be disabled due to failure
- assertFalse(fs.getClient().getShortCircuitLocalReads());
+ if (legacyShortCircuitFails) {
+ assertFalse(fs.getClient().useLegacyBlockReaderLocal());
}
stm.close();
}
@@ -145,11 +171,13 @@ public class TestShortCircuitLocalRead {
/** Check the file content, reading as user {@code readingUser} */
static void checkFileContentDirect(URI uri, Path name, byte[] expected,
int readOffset, String readingUser, Configuration conf,
- boolean shortCircuitFails)
+ boolean legacyShortCircuitFails)
throws IOException, InterruptedException {
// Ensure short circuit is enabled
DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
- assertTrue(fs.getClient().getShortCircuitLocalReads());
+ if (legacyShortCircuitFails) {
+ assertTrue(fs.getClient().useLegacyBlockReaderLocal());
+ }
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
@@ -180,33 +208,45 @@ public class TestShortCircuitLocalRead {
nread += nbytes;
}
checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
- if (shortCircuitFails) {
- // short circuit should be disabled due to failure
- assertFalse(fs.getClient().getShortCircuitLocalReads());
+ if (legacyShortCircuitFails) {
+ assertFalse(fs.getClient().useLegacyBlockReaderLocal());
}
stm.close();
}
+ public void doTestShortCircuitReadLegacy(boolean ignoreChecksum, int size,
+ int readOffset, String shortCircuitUser, String readingUser,
+ boolean legacyShortCircuitFails) throws IOException, InterruptedException {
+ doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
+ shortCircuitUser, readingUser, legacyShortCircuitFails);
+ }
+
public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
int readOffset) throws IOException, InterruptedException {
String shortCircuitUser = getCurrentUser();
- doTestShortCircuitRead(ignoreChecksum, size, readOffset, shortCircuitUser,
- shortCircuitUser, false);
+ doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
+ null, getCurrentUser(), false);
}
/**
* Test that file data can be read by reading the block file
* directly from the local store.
*/
- public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
+ public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size,
int readOffset, String shortCircuitUser, String readingUser,
- boolean shortCircuitFails) throws IOException, InterruptedException {
+ boolean legacyShortCircuitFails) throws IOException, InterruptedException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
ignoreChecksum);
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- shortCircuitUser);
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(),
+ "TestShortCircuitLocalRead._PORT.sock").getAbsolutePath());
+ if (shortCircuitUser != null) {
+ conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+ shortCircuitUser);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+ }
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
}
@@ -228,9 +268,9 @@ public class TestShortCircuitLocalRead {
URI uri = cluster.getURI();
checkFileContent(uri, file1, fileData, readOffset, readingUser, conf,
- shortCircuitFails);
+ legacyShortCircuitFails);
checkFileContentDirect(uri, file1, fileData, readOffset, readingUser,
- conf, shortCircuitFails);
+ conf, legacyShortCircuitFails);
} finally {
fs.close();
cluster.shutdown();
@@ -255,6 +295,12 @@ public class TestShortCircuitLocalRead {
doTestShortCircuitRead(true, 13, 5);
}
+ @Test(timeout=10000)
+ public void testLocalReadLegacy() throws Exception {
+ doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(),
+ getCurrentUser(), false);
+ }
+
/**
* Try a short circuit from a reader that is not allowed to
* to use short circuit. The test ensures reader falls back to non
@@ -262,7 +308,7 @@ public class TestShortCircuitLocalRead {
*/
@Test(timeout=10000)
public void testLocalReadFallback() throws Exception {
- doTestShortCircuitRead(true, 13, 0, getCurrentUser(), "notallowed", true);
+ doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(), "notallowed", true);
}
@Test(timeout=10000)
@@ -276,7 +322,7 @@ public class TestShortCircuitLocalRead {
doTestShortCircuitRead(false, 10*blockSize+100, 777);
doTestShortCircuitRead(true, 10*blockSize+100, 777);
}
-
+
private ClientDatanodeProtocol getProxy(UserGroupInformation ugi,
final DatanodeID dnInfo, final Configuration conf) throws IOException,
InterruptedException {
@@ -301,21 +347,15 @@ public class TestShortCircuitLocalRead {
}
@Test(timeout=10000)
- public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+ public void testDeprecatedGetBlockLocalPathInfoRpc()
+ throws IOException, InterruptedException {
final Configuration conf = new Configuration();
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- "alloweduser1,alloweduser2");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build();
cluster.waitActive();
- final DataNode dn = cluster.getDataNodes().get(0);
FileSystem fs = cluster.getFileSystem();
try {
DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
- UserGroupInformation aUgi1 =
- UserGroupInformation.createRemoteUser("alloweduser1");
- UserGroupInformation aUgi2 =
- UserGroupInformation.createRemoteUser("alloweduser2");
LocatedBlocks lb = cluster.getNameNode().getRpcServer()
.getBlockLocations("/tmp/x", 0, 16);
// Create a new block object, because the block inside LocatedBlock at
@@ -323,29 +363,11 @@ public class TestShortCircuitLocalRead {
ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
- ClientDatanodeProtocol proxy = getProxy(aUgi1, dnInfo, conf);
- // This should succeed
- BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
- Assert.assertEquals(
- DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
- blpi.getBlockPath());
-
- // Try with the other allowed user
- proxy = getProxy(aUgi2, dnInfo, conf);
-
- // This should succeed as well
- blpi = proxy.getBlockLocalPathInfo(blk, token);
- Assert.assertEquals(
- DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
- blpi.getBlockPath());
-
- // Now try with a disallowed user
- UserGroupInformation bUgi = UserGroupInformation
- .createRemoteUser("notalloweduser");
- proxy = getProxy(bUgi, dnInfo, conf);
+ ClientDatanodeProtocol proxy =
+ DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false);
try {
proxy.getBlockLocalPathInfo(blk, token);
- Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+ Assert.fail("The call should have failed as this user "
+ " is not allowed to call getBlockLocalPathInfo");
} catch (IOException ex) {
Assert.assertTrue(ex.getMessage().contains(
@@ -363,8 +385,9 @@ public class TestShortCircuitLocalRead {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- getCurrentUser());
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ "/tmp/testSkipWithVerifyChecksum._PORT");
+ DomainSocket.disableBindPathValidation();
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
}
@@ -402,6 +425,88 @@ public class TestShortCircuitLocalRead {
cluster.shutdown();
}
}
+
+ @Test
+ public void testHandleTruncatedBlockFile() throws IOException {
+ MiniDFSCluster cluster = null;
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ "/tmp/testHandleTruncatedBlockFile._PORT");
+ conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+ final Path TEST_PATH = new Path("/a");
+ final Path TEST_PATH2 = new Path("/b");
+ final long RANDOM_SEED = 4567L;
+ final long RANDOM_SEED2 = 4568L;
+ FSDataInputStream fsIn = null;
+ final int TEST_LENGTH = 3456;
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_LENGTH, (short)1, RANDOM_SEED);
+ DFSTestUtil.createFile(fs, TEST_PATH2,
+ TEST_LENGTH, (short)1, RANDOM_SEED2);
+ fsIn = cluster.getFileSystem().open(TEST_PATH2);
+ byte original[] = new byte[TEST_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_LENGTH);
+ fsIn.close();
+ fsIn = null;
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+ File dataFile = MiniDFSCluster.getBlockFile(0, block);
+ cluster.shutdown();
+ cluster = null;
+ RandomAccessFile raf = null;
+ try {
+ raf = new RandomAccessFile(dataFile, "rw");
+ raf.setLength(0);
+ } finally {
+ if (raf != null) raf.close();
+ }
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ fsIn = fs.open(TEST_PATH);
+ try {
+ byte buf[] = new byte[100];
+ fsIn.seek(2000);
+ fsIn.readFully(buf, 0, buf.length);
+ Assert.fail("shouldn't be able to read from corrupt 0-length " +
+ "block file.");
+ } catch (IOException e) {
+ DFSClient.LOG.error("caught exception ", e);
+ }
+ fsIn.close();
+ fsIn = null;
+
+ // We should still be able to read the other file.
+ // This is important because it indicates that we detected that the
+ // previous block was corrupt, rather than blaming the problem on
+ // communication.
+ fsIn = fs.open(TEST_PATH2);
+ byte buf[] = new byte[original.length];
+ fsIn.readFully(buf, 0, buf.length);
+ TestBlockReaderLocal.assertArrayRegionsEqual(original, 0, buf, 0,
+ original.length);
+ fsIn.close();
+ fsIn = null;
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
/**
* Test to run benchmarks between short circuit read vs regular read with
@@ -424,6 +529,8 @@ public class TestShortCircuitLocalRead {
// Setup create a file
final Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ "/tmp/TestShortCircuitLocalRead._PORT");
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
checksum);
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Sat Apr 13 23:05:54 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -145,8 +146,9 @@ public class TestBlockTokenWithDFS {
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader(
- conf, s, file, block,
- lblock.getBlockToken(), 0, -1, null);
+ conf, file, block, lblock.getBlockToken(), 0, -1,
+ true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
+ nodes[0], null, false);
} catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Sat Apr 13 23:05:54 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -962,6 +963,12 @@ public class SimulatedFSDataset implemen
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Sat Apr 13 23:05:54 2013
@@ -32,11 +32,13 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -280,10 +282,11 @@ public class TestDataNodeVolumeFailure {
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid",
block.getBlockId());
- BlockReaderFactory.newBlockReader(conf, s, file, block, lblock
- .getBlockToken(), 0, -1, null);
-
- // nothing - if it fails - it will throw and exception
+ BlockReader blockReader =
+ BlockReaderFactory.newBlockReader(conf, file, block,
+ lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
+ TcpPeerServer.peerFromSocket(s), datanode, null, false);
+ blockReader.close(null, null);
}
/**
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java Sat Apr 13 23:05:54 2013
@@ -1129,7 +1129,7 @@ public class TestCheckpoint {
throw new IOException(e);
}
- final int EXPECTED_TXNS_FIRST_SEG = 12;
+ final int EXPECTED_TXNS_FIRST_SEG = 11;
// the following steps should have happened:
// edits_inprogress_1 -> edits_1-12 (finalized)
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java Sat Apr 13 23:05:54 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.junit.Assert.assertEquals;
import java.io.File;
@@ -30,12 +31,14 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.junit.Test;
+import static org.mockito.Mockito.*;
/**
* This class tests the creation and validation of a checkpoint.
@@ -163,4 +166,70 @@ public class TestSecurityTokenEditLog {
if(cluster != null) cluster.shutdown();
}
}
+
+ @Test(timeout=10000)
+ public void testEditsForCancelOnTokenExpire() throws IOException,
+ InterruptedException {
+ long renewInterval = 2000;
+ Configuration conf = new Configuration();
+ conf.setBoolean(
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+ conf.setLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, renewInterval);
+ conf.setLong(DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, renewInterval*2);
+
+ Text renewer = new Text(UserGroupInformation.getCurrentUser().getUserName());
+ FSImage fsImage = mock(FSImage.class);
+ FSEditLog log = mock(FSEditLog.class);
+ doReturn(log).when(fsImage).getEditLog();
+ FSNamesystem fsn = new FSNamesystem(conf, fsImage);
+
+ DelegationTokenSecretManager dtsm = fsn.getDelegationTokenSecretManager();
+ try {
+ dtsm.startThreads();
+
+ // get two tokens
+ Token<DelegationTokenIdentifier> token1 = fsn.getDelegationToken(renewer);
+ Token<DelegationTokenIdentifier> token2 = fsn.getDelegationToken(renewer);
+ DelegationTokenIdentifier ident1 =
+ (DelegationTokenIdentifier)token1.decodeIdentifier();
+ DelegationTokenIdentifier ident2 =
+ (DelegationTokenIdentifier)token2.decodeIdentifier();
+
+ // verify we got the tokens
+ verify(log, times(1)).logGetDelegationToken(eq(ident1), anyLong());
+ verify(log, times(1)).logGetDelegationToken(eq(ident2), anyLong());
+
+ // this is a little tricky because DTSM doesn't let us set scan interval
+ // so need to periodically sleep, then stop/start threads to force scan
+
+ // renew first token 1/2 to expire
+ Thread.sleep(renewInterval/2);
+ fsn.renewDelegationToken(token2);
+ verify(log, times(1)).logRenewDelegationToken(eq(ident2), anyLong());
+ // force scan and give it a little time to complete
+ dtsm.stopThreads(); dtsm.startThreads();
+ Thread.sleep(250);
+ // no token has expired yet
+ verify(log, times(0)).logCancelDelegationToken(eq(ident1));
+ verify(log, times(0)).logCancelDelegationToken(eq(ident2));
+
+ // sleep past expiration of 1st non-renewed token
+ Thread.sleep(renewInterval/2);
+ dtsm.stopThreads(); dtsm.startThreads();
+ Thread.sleep(250);
+ // non-renewed token should have implicitly been cancelled
+ verify(log, times(1)).logCancelDelegationToken(eq(ident1));
+ verify(log, times(0)).logCancelDelegationToken(eq(ident2));
+
+ // sleep past expiration of 2nd renewed token
+ Thread.sleep(renewInterval/2);
+ dtsm.stopThreads(); dtsm.startThreads();
+ Thread.sleep(250);
+ // both tokens should have been implicitly cancelled by now
+ verify(log, times(1)).logCancelDelegationToken(eq(ident1));
+ verify(log, times(1)).logCancelDelegationToken(eq(ident2));
+ } finally {
+ dtsm.stopThreads();
+ }
+ }
}