You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/04/14 01:06:00 UTC

svn commit: r1467713 [4/4] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/net/ src/main/java/org/apache/hadoop/hdfs/proto...

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Sat Apr 13 23:05:54 2013
@@ -21,10 +21,13 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.assertFalse;
 
 import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -32,7 +35,6 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -40,15 +42,21 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import static org.hamcrest.CoreMatchers.*;
 
 /**
  * Test for short circuit read functionality using {@link BlockReaderLocal}.
@@ -58,9 +66,24 @@ import org.junit.Test;
  * system.
  */
 public class TestShortCircuitLocalRead {
+  private static TemporarySocketDirectory sockDir;
 
-  static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    sockDir.close();
+  }
 
+  @Before
+  public void before() {
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+  }
+  
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 5120;
   boolean simulatedStorage = false;
@@ -84,7 +107,9 @@ public class TestShortCircuitLocalRead {
     for (int idx = 0; idx < len; idx++) {
       if (expected[from + idx] != actual[idx]) {
         Assert.fail(message + " byte " + (from + idx) + " differs. expected "
-            + expected[from + idx] + " actual " + actual[idx]);
+            + expected[from + idx] + " actual " + actual[idx] +
+            "\nexpected: " + StringUtils.byteToHexString(expected, from, from + len) +
+            "\nactual:   " + StringUtils.byteToHexString(actual, 0, len));
       }
     }
   }
@@ -96,11 +121,13 @@ public class TestShortCircuitLocalRead {
   /** Check file content, reading as user {@code readingUser} */
   static void checkFileContent(URI uri, Path name, byte[] expected,
       int readOffset, String readingUser, Configuration conf,
-      boolean shortCircuitFails)
+      boolean legacyShortCircuitFails)
       throws IOException, InterruptedException {
     // Ensure short circuit is enabled
     DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
-    assertTrue(fs.getClient().getShortCircuitLocalReads());
+    if (legacyShortCircuitFails) {
+      assertTrue(fs.getClient().useLegacyBlockReaderLocal());
+    }
     
     FSDataInputStream stm = fs.open(name);
     byte[] actual = new byte[expected.length-readOffset];
@@ -127,9 +154,8 @@ public class TestShortCircuitLocalRead {
     }
     checkData(actual, readOffset, expected, "Read 3");
     
-    if (shortCircuitFails) {
-      // short circuit should be disabled due to failure
-      assertFalse(fs.getClient().getShortCircuitLocalReads());
+    if (legacyShortCircuitFails) {
+      assertFalse(fs.getClient().useLegacyBlockReaderLocal());
     }
     stm.close();
   }
@@ -145,11 +171,13 @@ public class TestShortCircuitLocalRead {
   /** Check the file content, reading as user {@code readingUser} */
   static void checkFileContentDirect(URI uri, Path name, byte[] expected,
       int readOffset, String readingUser, Configuration conf,
-      boolean shortCircuitFails)
+      boolean legacyShortCircuitFails)
       throws IOException, InterruptedException {
     // Ensure short circuit is enabled
     DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
-    assertTrue(fs.getClient().getShortCircuitLocalReads());
+    if (legacyShortCircuitFails) {
+      assertTrue(fs.getClient().useLegacyBlockReaderLocal());
+    }
     
     HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
 
@@ -180,33 +208,45 @@ public class TestShortCircuitLocalRead {
       nread += nbytes;
     }
     checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
-    if (shortCircuitFails) {
-      // short circuit should be disabled due to failure
-      assertFalse(fs.getClient().getShortCircuitLocalReads());
+    if (legacyShortCircuitFails) {
+      assertFalse(fs.getClient().useLegacyBlockReaderLocal());
     }
     stm.close();
   }
 
+  public void doTestShortCircuitReadLegacy(boolean ignoreChecksum, int size,
+      int readOffset, String shortCircuitUser, String readingUser,
+      boolean legacyShortCircuitFails) throws IOException, InterruptedException {
+    doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
+        shortCircuitUser, readingUser, legacyShortCircuitFails);
+  }
+
   public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
       int readOffset) throws IOException, InterruptedException {
     String shortCircuitUser = getCurrentUser();
-    doTestShortCircuitRead(ignoreChecksum, size, readOffset, shortCircuitUser,
-        shortCircuitUser, false);
+    doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
+        null, getCurrentUser(), false);
   }
   
   /**
    * Test that file data can be read by reading the block file
    * directly from the local store.
    */
-  public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
+  public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size,
       int readOffset, String shortCircuitUser, String readingUser,
-      boolean shortCircuitFails) throws IOException, InterruptedException {
+      boolean legacyShortCircuitFails) throws IOException, InterruptedException {
     Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
         ignoreChecksum);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        shortCircuitUser);
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(),
+          "TestShortCircuitLocalRead._PORT.sock").getAbsolutePath());
+    if (shortCircuitUser != null) {
+      conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+          shortCircuitUser);
+      conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+    }
     if (simulatedStorage) {
       SimulatedFSDataset.setFactory(conf);
     }
@@ -228,9 +268,9 @@ public class TestShortCircuitLocalRead {
       
       URI uri = cluster.getURI();
       checkFileContent(uri, file1, fileData, readOffset, readingUser, conf,
-          shortCircuitFails);
+          legacyShortCircuitFails);
       checkFileContentDirect(uri, file1, fileData, readOffset, readingUser,
-          conf, shortCircuitFails);
+          conf, legacyShortCircuitFails);
     } finally {
       fs.close();
       cluster.shutdown();
@@ -255,6 +295,12 @@ public class TestShortCircuitLocalRead {
     doTestShortCircuitRead(true, 13, 5);
   }
   
+  @Test(timeout=10000)
+  public void testLocalReadLegacy() throws Exception {
+    doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(),
+        getCurrentUser(), false);
+  }
+
   /**
    * Try a short circuit from a reader that is not allowed to
    * to use short circuit. The test ensures reader falls back to non
@@ -262,7 +308,7 @@ public class TestShortCircuitLocalRead {
    */
   @Test(timeout=10000)
   public void testLocalReadFallback() throws Exception {
-    doTestShortCircuitRead(true, 13, 0, getCurrentUser(), "notallowed", true);
+    doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(), "notallowed", true);
   }
   
   @Test(timeout=10000)
@@ -276,7 +322,7 @@ public class TestShortCircuitLocalRead {
     doTestShortCircuitRead(false, 10*blockSize+100, 777);
     doTestShortCircuitRead(true, 10*blockSize+100, 777);
   }
-   
+
   private ClientDatanodeProtocol getProxy(UserGroupInformation ugi,
       final DatanodeID dnInfo, final Configuration conf) throws IOException,
       InterruptedException {
@@ -301,21 +347,15 @@ public class TestShortCircuitLocalRead {
   }
   
   @Test(timeout=10000)
-  public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+  public void testDeprecatedGetBlockLocalPathInfoRpc()
+      throws IOException, InterruptedException {
     final Configuration conf = new Configuration();
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        "alloweduser1,alloweduser2");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
         .format(true).build();
     cluster.waitActive();
-    final DataNode dn = cluster.getDataNodes().get(0);
     FileSystem fs = cluster.getFileSystem();
     try {
       DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
-      UserGroupInformation aUgi1 =
-          UserGroupInformation.createRemoteUser("alloweduser1");
-      UserGroupInformation aUgi2 =
-          UserGroupInformation.createRemoteUser("alloweduser2");
       LocatedBlocks lb = cluster.getNameNode().getRpcServer()
           .getBlockLocations("/tmp/x", 0, 16);
       // Create a new block object, because the block inside LocatedBlock at
@@ -323,29 +363,11 @@ public class TestShortCircuitLocalRead {
       ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
       Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
       final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
-      ClientDatanodeProtocol proxy = getProxy(aUgi1, dnInfo, conf);
-      // This should succeed
-      BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
-      Assert.assertEquals(
-          DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
-          blpi.getBlockPath());
-
-      // Try with the other allowed user
-      proxy = getProxy(aUgi2, dnInfo, conf);
-
-      // This should succeed as well
-      blpi = proxy.getBlockLocalPathInfo(blk, token);
-      Assert.assertEquals(
-          DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
-          blpi.getBlockPath());
-
-      // Now try with a disallowed user
-      UserGroupInformation bUgi = UserGroupInformation
-          .createRemoteUser("notalloweduser");
-      proxy = getProxy(bUgi, dnInfo, conf);
+      ClientDatanodeProtocol proxy = 
+          DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false);
       try {
         proxy.getBlockLocalPathInfo(blk, token);
-        Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+        Assert.fail("The call should have failed as this user "
             + " is not allowed to call getBlockLocalPathInfo");
       } catch (IOException ex) {
         Assert.assertTrue(ex.getMessage().contains(
@@ -363,8 +385,9 @@ public class TestShortCircuitLocalRead {
     Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        getCurrentUser());
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        "/tmp/testSkipWithVerifyChecksum._PORT");
+    DomainSocket.disableBindPathValidation();
     if (simulatedStorage) {
       SimulatedFSDataset.setFactory(conf);
     }
@@ -402,6 +425,88 @@ public class TestShortCircuitLocalRead {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testHandleTruncatedBlockFile() throws IOException {
+    MiniDFSCluster cluster = null;
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        "/tmp/testHandleTruncatedBlockFile._PORT");
+    conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+    final Path TEST_PATH = new Path("/a");
+    final Path TEST_PATH2 = new Path("/b");
+    final long RANDOM_SEED = 4567L;
+    final long RANDOM_SEED2 = 4568L;
+    FSDataInputStream fsIn = null;
+    final int TEST_LENGTH = 3456;
+    
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_LENGTH, (short)1, RANDOM_SEED);
+      DFSTestUtil.createFile(fs, TEST_PATH2,
+          TEST_LENGTH, (short)1, RANDOM_SEED2);
+      fsIn = cluster.getFileSystem().open(TEST_PATH2);
+      byte original[] = new byte[TEST_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_LENGTH);
+      fsIn.close();
+      fsIn = null;
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      File dataFile = MiniDFSCluster.getBlockFile(0, block);
+      cluster.shutdown();
+      cluster = null;
+      RandomAccessFile raf = null;
+      try {
+        raf = new RandomAccessFile(dataFile, "rw");
+        raf.setLength(0);
+      } finally {
+        if (raf != null) raf.close();
+      }
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      fsIn = fs.open(TEST_PATH);
+      try {
+        byte buf[] = new byte[100];
+        fsIn.seek(2000);
+        fsIn.readFully(buf, 0, buf.length);
+        Assert.fail("shouldn't be able to read from corrupt 0-length " +
+            "block file.");
+      } catch (IOException e) {
+        DFSClient.LOG.error("caught exception ", e);
+      }
+      fsIn.close();
+      fsIn = null;
+
+      // We should still be able to read the other file.
+      // This is important because it indicates that we detected that the 
+      // previous block was corrupt, rather than blaming the problem on
+      // communication.
+      fsIn = fs.open(TEST_PATH2);
+      byte buf[] = new byte[original.length];
+      fsIn.readFully(buf, 0, buf.length);
+      TestBlockReaderLocal.assertArrayRegionsEqual(original, 0, buf, 0,
+          original.length);
+      fsIn.close();
+      fsIn = null;
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
      
   /**
    * Test to run benchmarks between short circuit read vs regular read with
@@ -424,6 +529,8 @@ public class TestShortCircuitLocalRead {
     // Setup create a file
     final Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        "/tmp/TestShortCircuitLocalRead._PORT");
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
         checksum);
     

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Sat Apr 13 23:05:54 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -145,8 +146,9 @@ public class TestBlockTokenWithDFS {
       String file = BlockReaderFactory.getFileName(targetAddr, 
           "test-blockpoolid", block.getBlockId());
       blockReader = BlockReaderFactory.newBlockReader(
-          conf, s, file, block, 
-          lblock.getBlockToken(), 0, -1, null);
+          conf, file, block, lblock.getBlockToken(), 0, -1,
+          true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
+          nodes[0], null, false);
 
     } catch (IOException ex) {
       if (ex instanceof InvalidBlockTokenException) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Sat Apr 13 23:05:54 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -962,6 +963,12 @@ public class SimulatedFSDataset implemen
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
   
   @Override
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Sat Apr 13 23:05:54 2013
@@ -32,11 +32,13 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -280,10 +282,11 @@ public class TestDataNodeVolumeFailure {
     String file = BlockReaderFactory.getFileName(targetAddr, 
         "test-blockpoolid",
         block.getBlockId());
-    BlockReaderFactory.newBlockReader(conf, s, file, block, lblock
-        .getBlockToken(), 0, -1, null);
-
-    // nothing - if it fails - it will throw and exception
+    BlockReader blockReader =
+      BlockReaderFactory.newBlockReader(conf, file, block,
+        lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
+        TcpPeerServer.peerFromSocket(s), datanode, null, false);
+    blockReader.close(null, null);
   }
   
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java Sat Apr 13 23:05:54 2013
@@ -1129,7 +1129,7 @@ public class TestCheckpoint {
         throw new IOException(e);
       }
       
-      final int EXPECTED_TXNS_FIRST_SEG = 12;
+      final int EXPECTED_TXNS_FIRST_SEG = 11;
       
       // the following steps should have happened:
       //   edits_inprogress_1 -> edits_1-12  (finalized)

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java Sat Apr 13 23:05:54 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.junit.Assert.assertEquals;
 
 import java.io.File;
@@ -30,12 +31,14 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.junit.Test;
+import static org.mockito.Mockito.*;
 
 /**
  * This class tests the creation and validation of a checkpoint.
@@ -163,4 +166,70 @@ public class TestSecurityTokenEditLog {
       if(cluster != null) cluster.shutdown();
     }
   }
+  
+  @Test(timeout=10000)
+  public void testEditsForCancelOnTokenExpire() throws IOException,
+  InterruptedException {
+    long renewInterval = 2000;
+    Configuration conf = new Configuration();
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+    conf.setLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, renewInterval);
+    conf.setLong(DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, renewInterval*2);
+
+    Text renewer = new Text(UserGroupInformation.getCurrentUser().getUserName());
+    FSImage fsImage = mock(FSImage.class);
+    FSEditLog log = mock(FSEditLog.class);
+    doReturn(log).when(fsImage).getEditLog();   
+    FSNamesystem fsn = new FSNamesystem(conf, fsImage);
+    
+    DelegationTokenSecretManager dtsm = fsn.getDelegationTokenSecretManager();
+    try {
+      dtsm.startThreads();
+      
+      // get two tokens
+      Token<DelegationTokenIdentifier> token1 = fsn.getDelegationToken(renewer);
+      Token<DelegationTokenIdentifier> token2 = fsn.getDelegationToken(renewer);
+      DelegationTokenIdentifier ident1 =
+          (DelegationTokenIdentifier)token1.decodeIdentifier();
+      DelegationTokenIdentifier ident2 =
+          (DelegationTokenIdentifier)token2.decodeIdentifier();
+      
+      // verify we got the tokens
+      verify(log, times(1)).logGetDelegationToken(eq(ident1), anyLong());
+      verify(log, times(1)).logGetDelegationToken(eq(ident2), anyLong());
+      
+      // this is a little tricky because DTSM doesn't let us set scan interval
+      // so need to periodically sleep, then stop/start threads to force scan
+      
+      // renew first token 1/2 to expire
+      Thread.sleep(renewInterval/2);
+      fsn.renewDelegationToken(token2);
+      verify(log, times(1)).logRenewDelegationToken(eq(ident2), anyLong());
+      // force scan and give it a little time to complete
+      dtsm.stopThreads(); dtsm.startThreads();
+      Thread.sleep(250);
+      // no token has expired yet 
+      verify(log, times(0)).logCancelDelegationToken(eq(ident1));
+      verify(log, times(0)).logCancelDelegationToken(eq(ident2));
+      
+      // sleep past expiration of 1st non-renewed token
+      Thread.sleep(renewInterval/2);
+      dtsm.stopThreads(); dtsm.startThreads();
+      Thread.sleep(250);
+      // non-renewed token should have implicitly been cancelled
+      verify(log, times(1)).logCancelDelegationToken(eq(ident1));
+      verify(log, times(0)).logCancelDelegationToken(eq(ident2));
+      
+      // sleep past expiration of 2nd renewed token
+      Thread.sleep(renewInterval/2);
+      dtsm.stopThreads(); dtsm.startThreads();
+      Thread.sleep(250);
+      // both tokens should have been implicitly cancelled by now
+      verify(log, times(1)).logCancelDelegationToken(eq(ident1));
+      verify(log, times(1)).logCancelDelegationToken(eq(ident2));
+    } finally {
+      dtsm.stopThreads();
+    }
+  }
 }