You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2009/08/12 19:30:27 UTC
svn commit: r803622 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/test/hdfs/org/apache/hadoop/hdfs/
src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/
src/test/hdfs/org/apache/hadoo...
Author: szetszwo
Date: Wed Aug 12 17:30:27 2009
New Revision: 803622
URL: http://svn.apache.org/viewvc?rev=803622&view=rev
Log:
HDFS-409. Add more access token tests. Contributed by Kan Zhang
Added:
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Aug 12 17:30:27 2009
@@ -92,6 +92,8 @@
HDFS-451. Add fault injection tests, Pipeline_Fi_06,07,14,15, for
DataTransferProtocol. (szetszwo)
+ HDFS-409. Add more access token tests. (Kan Zhang via szetszwo)
+
BUG FIXES
HDFS-76. Better error message to users when commands fail because of
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Aug 12 17:30:27 2009
@@ -1449,11 +1449,15 @@
if (status != SUCCESS) {
if (status == ERROR_ACCESS_TOKEN) {
throw new InvalidAccessTokenException(
- "Got access token error in response to OP_READ_BLOCK "
- + "for file " + file + " for block " + blockId);
+ "Got access token error for OP_READ_BLOCK, self="
+ + sock.getLocalSocketAddress() + ", remote="
+ + sock.getRemoteSocketAddress() + ", for file " + file
+ + ", for block " + blockId + "_" + genStamp);
} else {
- throw new IOException("Got error in response to OP_READ_BLOCK "
- + "for file " + file + " for block " + blockId);
+ throw new IOException("Got error for OP_READ_BLOCK, self="
+ + sock.getLocalSocketAddress() + ", remote="
+ + sock.getRemoteSocketAddress() + ", for file " + file
+ + ", for block " + blockId + "_" + genStamp);
}
}
DataChecksum checksum = DataChecksum.newDataChecksum( in );
@@ -1729,9 +1733,10 @@
buffersize, verifyChecksum, clientName);
return chosenNode;
} catch (IOException ex) {
- LOG.debug("Failed to connect to " + targetAddr + ":"
- + StringUtils.stringifyException(ex));
- if (ex instanceof InvalidAccessTokenException && refetchToken-- > 0) {
+ if (ex instanceof InvalidAccessTokenException && refetchToken > 0) {
+ LOG.info("Will fetch a new access token and retry, "
+ + "access token was invalid when connecting to " + targetAddr
+ + " : " + ex);
/*
* Get a new access token and retry. Retry is needed in 2 cases. 1)
* When both NN and DN re-started while DFSClient holding a cached
@@ -1742,8 +1747,11 @@
* access key from its memory since it's considered expired based on
* the estimated expiration date.
*/
+ refetchToken--;
fetchBlockAt(target);
} else {
+ LOG.info("Failed to connect to " + targetAddr
+ + ", add to deadNodes and continue", ex);
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
@@ -1964,12 +1972,11 @@
e.getPos() + " from " + chosenNode.getName());
reportChecksumFailure(src, block.getBlock(), chosenNode);
} catch (IOException e) {
- if (e instanceof InvalidAccessTokenException && refetchToken-- > 0) {
- LOG.info("Invalid access token when connecting to " + targetAddr
- + " for file " + src + " for block "
- + block.getBlock() + ":"
- + StringUtils.stringifyException(e)
- + ", get a new access token and retry...");
+ if (e instanceof InvalidAccessTokenException && refetchToken > 0) {
+ LOG.info("Will get a new access token and retry, "
+ + "access token was invalid when connecting to " + targetAddr
+ + " : " + e);
+ refetchToken--;
fetchBlockAt(block.getStartOffset());
continue;
} else {
@@ -2982,6 +2989,10 @@
return nodes;
}
+ AccessToken getAccessToken() {
+ return accessToken;
+ }
+
private void setLastException(IOException e) {
if (lastException == null) {
lastException = e;
@@ -3386,6 +3397,14 @@
long getInitialLen() {
return initialFileSize;
}
+
+ /**
+ * Returns the access token currently used by streamer, for testing only
+ */
+ AccessToken getAccessToken() {
+ return streamer.getAccessToken();
+ }
+
}
void reportChecksumFailure(String file, Block blk, DatanodeInfo dn) {
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 12 17:30:27 2009
@@ -140,8 +140,8 @@
try {
ERROR_ACCESS_TOKEN.write(out);
out.flush();
- throw new IOException("Access token verification failed, on client "
- + "request for reading block " + block);
+ throw new IOException("Access token verification failed, for client "
+ + remoteAddress + " for OP_READ_BLOCK for block " + block);
} finally {
IOUtils.closeStream(out);
}
@@ -235,8 +235,8 @@
Text.writeString(replyOut, datanode.dnRegistration.getName());
replyOut.flush();
}
- throw new IOException("Access token verification failed, on client "
- + "request for writing block " + block);
+ throw new IOException("Access token verification failed, for client "
+ + remoteAddress + " for OP_WRITE_BLOCK for block " + block);
} finally {
IOUtils.closeStream(replyOut);
}
@@ -388,8 +388,8 @@
ERROR_ACCESS_TOKEN.write(out);
out.flush();
throw new IOException(
- "Access token verification failed, on getBlockChecksum() "
- + "for block " + block);
+ "Access token verification failed, for client " + remoteAddress
+ + " for OP_BLOCK_CHECKSUM for block " + block);
} finally {
IOUtils.closeStream(out);
}
@@ -443,7 +443,7 @@
&& !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
AccessTokenHandler.AccessMode.COPY)) {
LOG.warn("Invalid access token in request from "
- + s.getRemoteSocketAddress() + " for copying block " + block);
+ + remoteAddress + " for OP_COPY_BLOCK for block " + block);
sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
return;
}
@@ -515,7 +515,7 @@
&& !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
AccessTokenHandler.AccessMode.REPLACE)) {
LOG.warn("Invalid access token in request from "
- + s.getRemoteSocketAddress() + " for replacing block " + block);
+ + remoteAddress + " for OP_REPLACE_BLOCK for block " + block);
sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
return;
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Aug 12 17:30:27 2009
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.net.URL;
import java.net.URLConnection;
+import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@@ -35,7 +36,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.AccessToken;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
@@ -249,6 +252,15 @@
return in.getCurrentBlock();
}
+ public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)
+ throws IOException {
+ return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
+ }
+
+ public static AccessToken getAccessToken(FSDataOutputStream out) {
+ return ((DFSClient.DFSOutputStream) out.getWrappedStream()).getAccessToken();
+ }
+
static void setLogLevel2All(org.apache.commons.logging.Log log) {
((org.apache.commons.logging.impl.Log4JLogger)log
).getLogger().setLevel(org.apache.log4j.Level.ALL);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Aug 12 17:30:27 2009
@@ -32,6 +32,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.*;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -43,6 +44,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.security.*;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
/**
@@ -572,6 +574,45 @@
}
}
+ /**
+ * Shutdown namenode.
+ */
+ public synchronized void shutdownNameNode() {
+ if (nameNode != null) {
+ System.out.println("Shutting down the namenode");
+ nameNode.stop();
+ nameNode.join();
+ nameNode = null;
+ }
+ }
+
+ /**
+ * Restart namenode.
+ */
+ public synchronized void restartNameNode() throws IOException {
+ shutdownNameNode();
+ nameNode = NameNode.createNameNode(new String[] {}, conf);
+ waitClusterUp();
+ System.out.println("Restarted the namenode");
+ int failedCount = 0;
+ while (true) {
+ try {
+ waitActive();
+ break;
+ } catch (IOException e) {
+ failedCount++;
+ // Cached RPC connection to namenode, if any, is expected to fail once
+ if (failedCount > 1) {
+ System.out.println("Tried waitActive() " + failedCount
+ + " time(s) and failed, giving up. "
+ + StringUtils.stringifyException(e));
+ throw e;
+ }
+ }
+ }
+ System.out.println("Cluster is active");
+ }
+
/*
* Corrupt a block on all datanode
*/
@@ -611,7 +652,7 @@
/*
* Shutdown a particular datanode
*/
- public DataNodeProperties stopDataNode(int i) {
+ public synchronized DataNodeProperties stopDataNode(int i) {
if (i < 0 || i >= dataNodes.size()) {
return null;
}
@@ -626,60 +667,91 @@
return dnprop;
}
+ /*
+ * Shutdown a datanode by name.
+ */
+ public synchronized DataNodeProperties stopDataNode(String name) {
+ int i;
+ for (i = 0; i < dataNodes.size(); i++) {
+ DataNode dn = dataNodes.get(i).datanode;
+ if (dn.dnRegistration.getName().equals(name)) {
+ break;
+ }
+ }
+ return stopDataNode(i);
+ }
+
/**
* Restart a datanode
* @param dnprop datanode's property
* @return true if restarting is successful
* @throws IOException
*/
- public synchronized boolean restartDataNode(DataNodeProperties dnprop)
- throws IOException {
+ public boolean restartDataNode(DataNodeProperties dnprop) throws IOException {
+ return restartDataNode(dnprop, false);
+ }
+
+ /**
+ * Restart a datanode, on the same port if requested
+ * @param dnprop, the datanode to restart
+ * @param keepPort, whether to use the same port
+ * @return true if restarting is successful
+ * @throws IOException
+ */
+ public synchronized boolean restartDataNode(DataNodeProperties dnprop,
+ boolean keepPort) throws IOException {
Configuration conf = dnprop.conf;
String[] args = dnprop.dnArgs;
Configuration newconf = new Configuration(conf); // save cloned config
- dataNodes.add(new DataNodeProperties(
- DataNode.createDataNode(args, conf),
- newconf, args));
+ if (keepPort) {
+ InetSocketAddress addr = dnprop.datanode.getSelfAddr();
+ conf.set("dfs.datanode.address", addr.getAddress().getHostAddress() + ":"
+ + addr.getPort());
+ }
+ dataNodes.add(new DataNodeProperties(DataNode.createDataNode(args, conf),
+ newconf, args));
numDataNodes++;
return true;
+ }
+ /*
+ * Restart a particular datanode, use newly assigned port
+ */
+ public boolean restartDataNode(int i) throws IOException {
+ return restartDataNode(i, false);
}
+
/*
- * Restart a particular datanode
+ * Restart a particular datanode, on the same port if keepPort is true
*/
- public synchronized boolean restartDataNode(int i) throws IOException {
+ public synchronized boolean restartDataNode(int i, boolean keepPort)
+ throws IOException {
DataNodeProperties dnprop = stopDataNode(i);
if (dnprop == null) {
return false;
} else {
- return restartDataNode(dnprop);
+ return restartDataNode(dnprop, keepPort);
}
}
/*
- * Restart all datanodes
+ * Restart all datanodes, on the same ports if keepPort is true
*/
- public synchronized boolean restartDataNodes() throws IOException {
- for (int i = dataNodes.size()-1; i >= 0; i--) {
- System.out.println("Restarting DataNode " + i);
- if (!restartDataNode(i))
+ public synchronized boolean restartDataNodes(boolean keepPort)
+ throws IOException {
+ for (int i = dataNodes.size() - 1; i >= 0; i--) {
+ if (!restartDataNode(i, keepPort))
return false;
+ System.out.println("Restarted DataNode " + i);
}
return true;
}
/*
- * Shutdown a datanode by name.
+ * Restart all datanodes, use newly assigned ports
*/
- public synchronized DataNodeProperties stopDataNode(String name) {
- int i;
- for (i = 0; i < dataNodes.size(); i++) {
- DataNode dn = dataNodes.get(i).datanode;
- if (dn.dnRegistration.getName().equals(name)) {
- break;
- }
- }
- return stopDataNode(i);
+ public boolean restartDataNodes() throws IOException {
+ return restartDataNodes(false);
}
/**
@@ -739,18 +811,31 @@
getNameNodePort());
DFSClient client = new DFSClient(addr, conf);
- // make sure all datanodes are alive
- while(client.datanodeReport(DatanodeReportType.LIVE).length
- != numDataNodes) {
+ // make sure all datanodes have registered and sent heartbeat
+ while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) {
try {
Thread.sleep(100);
- } catch (Exception e) {
+ } catch (InterruptedException e) {
}
}
client.close();
}
+ private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) {
+ if (dnInfo.length != numDataNodes) {
+ return true;
+ }
+ // make sure all datanodes have sent first heartbeat to namenode,
+ // using (capacity == 0) as proxy.
+ for (DatanodeInfo dn : dnInfo) {
+ if (dn.getCapacity() == 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public void formatDataNodeDirs() throws IOException {
base_dir = new File(getBaseDirectory());
data_dir = new File(base_dir, "data");
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Wed Aug 12 17:30:27 2009
@@ -34,7 +34,6 @@
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.AccessTokenHandler;
import junit.framework.TestCase;
/**
@@ -60,7 +59,6 @@
}
private void initConf(Configuration conf) {
- conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
conf.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
conf.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
conf.setLong("dfs.heartbeat.interval", 1L);
@@ -259,23 +257,32 @@
} while(!balanced);
}
+
+ /** one-node cluster test*/
+ private void oneNodeTest(Configuration conf) throws Exception {
+ // add an empty node with half of the CAPACITY & the same rack
+ test(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+ }
+
+ /** two-node cluster test */
+ private void twoNodeTest(Configuration conf) throws Exception {
+ test(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2);
+ }
+
+ /** test using a user-supplied conf */
+ public void integrationTest(Configuration conf) throws Exception {
+ initConf(conf);
+ oneNodeTest(conf);
+ }
+
/** Test a cluster with even distribution,
* then a new empty node is added to the cluster*/
public void testBalancer0() throws Exception {
Configuration conf = new Configuration();
initConf(conf);
- /** one-node cluster test*/
- // add an empty node with half of the CAPACITY & the same rack
- test(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
-
- /** two-node cluster test */
- test(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
- CAPACITY, RACK2);
-
- /** End-to-end testing of access token, involving NN, DN, and Balancer */
- Configuration newConf = new Configuration(conf);
- newConf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
- test(newConf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+ oneNodeTest(conf);
+ twoNodeTest(conf);
}
/** Test unevenly distributed cluster */
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java?rev=803622&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java Wed Aug 12 17:30:27 2009
@@ -0,0 +1,531 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.security.InvalidAccessTokenException;
+import org.apache.hadoop.security.SecurityTestUtil;
+import org.apache.log4j.Level;
+
+import junit.framework.TestCase;
+
+public class TestAccessTokenWithDFS extends TestCase {
+
+ private static final int BLOCK_SIZE = 1024;
+ private static final int FILE_SIZE = 2 * BLOCK_SIZE;
+ private static final String FILE_TO_READ = "/fileToRead.dat";
+ private static final String FILE_TO_WRITE = "/fileToWrite.dat";
+ private static final String FILE_TO_APPEND = "/fileToAppend.dat";
+ private final byte[] rawData = new byte[FILE_SIZE];
+
+ {
+ ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ Random r = new Random();
+ r.nextBytes(rawData);
+ }
+
+ private void createFile(FileSystem fs, Path filename) throws IOException {
+ FSDataOutputStream out = fs.create(filename);
+ out.write(rawData);
+ out.close();
+ }
+
+ // read a file using blockSeekTo()
+ private boolean checkFile1(FSDataInputStream in) {
+ byte[] toRead = new byte[FILE_SIZE];
+ int totalRead = 0;
+ int nRead = 0;
+ try {
+ while ((nRead = in.read(toRead, totalRead, toRead.length - totalRead)) > 0) {
+ totalRead += nRead;
+ }
+ } catch (IOException e) {
+ return false;
+ }
+ assertEquals("Cannot read file.", toRead.length, totalRead);
+ return checkFile(toRead);
+ }
+
+ // read a file using fetchBlockByteRange()
+ private boolean checkFile2(FSDataInputStream in) {
+ byte[] toRead = new byte[FILE_SIZE];
+ try {
+ assertEquals("Cannot read file", toRead.length, in.read(0, toRead, 0,
+ toRead.length));
+ } catch (IOException e) {
+ return false;
+ }
+ return checkFile(toRead);
+ }
+
+ private boolean checkFile(byte[] fileToCheck) {
+ if (fileToCheck.length != rawData.length) {
+ return false;
+ }
+ for (int i = 0; i < fileToCheck.length; i++) {
+ if (fileToCheck[i] != rawData[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // creates a file and returns a descriptor for writing to it
+ private static FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+ short repl, long blockSize) throws IOException {
+ FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+ .getInt("io.file.buffer.size", 4096), repl, blockSize);
+ return stm;
+ }
+
+ // try reading a block using a BlockReader directly
+ private static void tryRead(Configuration conf, LocatedBlock lblock,
+ boolean shouldSucceed) {
+ InetSocketAddress targetAddr = null;
+ Socket s = null;
+ DFSClient.BlockReader blockReader = null;
+ Block block = lblock.getBlock();
+ try {
+ DatanodeInfo[] nodes = lblock.getLocations();
+ targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
+ s = new Socket();
+ s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
+ s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+
+ blockReader = DFSClient.BlockReader.newBlockReader(s, targetAddr
+ .toString()
+ + ":" + block.getBlockId(), block.getBlockId(), lblock
+ .getAccessToken(), block.getGenerationStamp(), 0, -1, conf.getInt(
+ "io.file.buffer.size", 4096));
+
+ } catch (IOException ex) {
+ if (ex instanceof InvalidAccessTokenException) {
+ assertFalse("OP_READ_BLOCK: access token is invalid, "
+ + "when it is expected to be valid", shouldSucceed);
+ return;
+ }
+ fail("OP_READ_BLOCK failed due to reasons other than access token");
+ } finally {
+ if (s != null) {
+ try {
+ s.close();
+ } catch (IOException iex) {
+ } finally {
+ s = null;
+ }
+ }
+ }
+ if (blockReader == null) {
+ fail("OP_READ_BLOCK failed due to reasons other than access token");
+ }
+ assertTrue("OP_READ_BLOCK: access token is valid, "
+ + "when it is expected to be invalid", shouldSucceed);
+ }
+
+ // get a conf for testing
+ private static Configuration getConf(int numDataNodes) throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
+ conf.setLong("dfs.block.size", BLOCK_SIZE);
+ conf.setInt("io.bytes.per.checksum", BLOCK_SIZE);
+ conf.setInt("dfs.heartbeat.interval", 1);
+ conf.setInt("dfs.replication", numDataNodes);
+ conf.setInt("ipc.client.connect.max.retries", 0);
+ conf.setBoolean("dfs.support.append", true);
+ return conf;
+ }
+
+ /*
+ * testing that APPEND operation can handle token expiration when
+ * re-establishing pipeline is needed
+ */
+ public void testAppend() throws Exception {
+ MiniDFSCluster cluster = null;
+ int numDataNodes = 2;
+ Configuration conf = getConf(numDataNodes);
+
+ try {
+ cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+ cluster.waitActive();
+ assertEquals(numDataNodes, cluster.getDataNodes().size());
+ // set a short token lifetime (1 second)
+ SecurityTestUtil.setAccessTokenLifetime(
+ cluster.getNamesystem().accessTokenHandler, 1000L);
+ Path fileToAppend = new Path(FILE_TO_APPEND);
+ FileSystem fs = cluster.getFileSystem();
+
+ // write a one-byte file
+ FSDataOutputStream stm = writeFile(fs, fileToAppend,
+ (short) numDataNodes, BLOCK_SIZE);
+ stm.write(rawData, 0, 1);
+ stm.close();
+ // open the file again for append
+ stm = fs.append(fileToAppend);
+ int mid = rawData.length - 1;
+ stm.write(rawData, 1, mid - 1);
+ stm.sync();
+
+ /*
+ * wait till token used in stm expires
+ */
+ AccessToken token = DFSTestUtil.getAccessToken(stm);
+ while (!SecurityTestUtil.isAccessTokenExpired(token)) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ // remove a datanode to force re-establishing pipeline
+ cluster.stopDataNode(0);
+ // append the rest of the file
+ stm.write(rawData, mid, rawData.length - mid);
+ stm.close();
+ // check if append is successful
+ FSDataInputStream in5 = fs.open(fileToAppend);
+ assertTrue(checkFile1(in5));
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /*
+ * testing that WRITE operation can handle token expiration when
+ * re-establishing pipeline is needed
+ */
+ public void testWrite() throws Exception {
+ MiniDFSCluster cluster = null;
+ int numDataNodes = 2;
+ Configuration conf = getConf(numDataNodes);
+
+ try {
+ cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+ cluster.waitActive();
+ assertEquals(numDataNodes, cluster.getDataNodes().size());
+ // set a short token lifetime (1 second)
+ SecurityTestUtil.setAccessTokenLifetime(
+ cluster.getNamesystem().accessTokenHandler, 1000L);
+ Path fileToWrite = new Path(FILE_TO_WRITE);
+ FileSystem fs = cluster.getFileSystem();
+
+ FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes,
+ BLOCK_SIZE);
+ // write a partial block
+ int mid = rawData.length - 1;
+ stm.write(rawData, 0, mid);
+ stm.sync();
+
+ /*
+ * wait till token used in stm expires
+ */
+ AccessToken token = DFSTestUtil.getAccessToken(stm);
+ while (!SecurityTestUtil.isAccessTokenExpired(token)) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ // remove a datanode to force re-establishing pipeline
+ cluster.stopDataNode(0);
+ // write the rest of the file
+ stm.write(rawData, mid, rawData.length - mid);
+ stm.close();
+ // check if write is successful
+ FSDataInputStream in4 = fs.open(fileToWrite);
+ assertTrue(checkFile1(in4));
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ public void testRead() throws Exception {
+ MiniDFSCluster cluster = null;
+ int numDataNodes = 2;
+ Configuration conf = getConf(numDataNodes);
+
+ try {
+ cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+ cluster.waitActive();
+ assertEquals(numDataNodes, cluster.getDataNodes().size());
+ // set a short token lifetime (1 second) initially
+ SecurityTestUtil.setAccessTokenLifetime(
+ cluster.getNamesystem().accessTokenHandler, 1000L);
+ Path fileToRead = new Path(FILE_TO_READ);
+ FileSystem fs = cluster.getFileSystem();
+ createFile(fs, fileToRead);
+
+ /*
+ * setup for testing expiration handling of cached tokens
+ */
+
+ // read using blockSeekTo(). Acquired tokens are cached in in1
+ FSDataInputStream in1 = fs.open(fileToRead);
+ assertTrue(checkFile1(in1));
+ // read using blockSeekTo(). Acquired tokens are cached in in2
+ FSDataInputStream in2 = fs.open(fileToRead);
+ assertTrue(checkFile1(in2));
+ // read using fetchBlockByteRange(). Acquired tokens are cached in in3
+ FSDataInputStream in3 = fs.open(fileToRead);
+ assertTrue(checkFile2(in3));
+
+ /*
+ * testing READ interface on DN using a BlockReader
+ */
+
+ DFSClient dfsclient = new DFSClient(new InetSocketAddress("localhost",
+ cluster.getNameNodePort()), conf);
+ List<LocatedBlock> locatedBlocks = dfsclient.getNamenode().getBlockLocations(
+ FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
+ LocatedBlock lblock = locatedBlocks.get(0); // first block
+ AccessToken myToken = lblock.getAccessToken();
+ // verify token is not expired
+ assertFalse(SecurityTestUtil.isAccessTokenExpired(myToken));
+ // read with valid token, should succeed
+ tryRead(conf, lblock, true);
+
+ /*
+ * wait till myToken and all cached tokens in in1, in2 and in3 expire
+ */
+
+ while (!SecurityTestUtil.isAccessTokenExpired(myToken)) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ /*
+ * continue testing READ interface on DN using a BlockReader
+ */
+
+ // verify token is expired
+ assertTrue(SecurityTestUtil.isAccessTokenExpired(myToken));
+ // read should fail
+ tryRead(conf, lblock, false);
+ // use a valid new token
+ lblock.setAccessToken(cluster.getNamesystem().accessTokenHandler
+ .generateToken(lblock.getBlock().getBlockId(), EnumSet
+ .of(AccessTokenHandler.AccessMode.READ)));
+ // read should succeed
+ tryRead(conf, lblock, true);
+ // use a token with wrong blockID
+ lblock.setAccessToken(cluster.getNamesystem().accessTokenHandler
+ .generateToken(lblock.getBlock().getBlockId() + 1, EnumSet
+ .of(AccessTokenHandler.AccessMode.READ)));
+ // read should fail
+ tryRead(conf, lblock, false);
+ // use a token with wrong access modes
+ lblock.setAccessToken(cluster.getNamesystem().accessTokenHandler
+ .generateToken(lblock.getBlock().getBlockId(), EnumSet.of(
+ AccessTokenHandler.AccessMode.WRITE,
+ AccessTokenHandler.AccessMode.COPY,
+ AccessTokenHandler.AccessMode.REPLACE)));
+ // read should fail
+ tryRead(conf, lblock, false);
+
+ // set a long token lifetime for future tokens
+ SecurityTestUtil.setAccessTokenLifetime(
+ cluster.getNamesystem().accessTokenHandler, 600 * 1000L);
+
+ /*
+ * testing that when cached tokens are expired, DFSClient will re-fetch
+ * tokens transparently for READ.
+ */
+
+ // confirm all tokens cached in in1 are expired by now
+ List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
+ for (LocatedBlock blk : lblocks) {
+ assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+ }
+ // verify blockSeekTo() is able to re-fetch token transparently
+ in1.seek(0);
+ assertTrue(checkFile1(in1));
+
+ // confirm all tokens cached in in2 are expired by now
+ List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
+ for (LocatedBlock blk : lblocks2) {
+ assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+ }
+ // verify blockSeekTo() is able to re-fetch token transparently (testing
+ // via another interface method)
+ assertTrue(in2.seekToNewSource(0));
+ assertTrue(checkFile1(in2));
+
+ // confirm all tokens cached in in3 are expired by now
+ List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
+ for (LocatedBlock blk : lblocks3) {
+ assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+ }
+ // verify fetchBlockByteRange() is able to re-fetch token transparently
+ assertTrue(checkFile2(in3));
+
+ /*
+ * testing that after datanodes are restarted on the same ports, cached
+ * tokens should still work and there is no need to fetch new tokens from
+ * namenode. This test should run while namenode is down (to make sure no
+ * new tokens can be fetched from namenode).
+ */
+
+ // restart datanodes on the same ports that they currently use
+ assertTrue(cluster.restartDataNodes(true));
+ cluster.waitActive();
+ assertEquals(numDataNodes, cluster.getDataNodes().size());
+ cluster.shutdownNameNode();
+
+ // confirm tokens cached in in1 are still valid
+ lblocks = DFSTestUtil.getAllBlocks(in1);
+ for (LocatedBlock blk : lblocks) {
+ assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+ }
+ // verify blockSeekTo() still works (forced to use cached tokens)
+ in1.seek(0);
+ assertTrue(checkFile1(in1));
+
+ // confirm tokens cached in in2 are still valid
+ lblocks2 = DFSTestUtil.getAllBlocks(in2);
+ for (LocatedBlock blk : lblocks2) {
+ assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+ }
+ // verify blockSeekTo() still works (forced to use cached tokens)
+ in2.seekToNewSource(0);
+ assertTrue(checkFile1(in2));
+
+ // confirm tokens cached in in3 are still valid
+ lblocks3 = DFSTestUtil.getAllBlocks(in3);
+ for (LocatedBlock blk : lblocks3) {
+ assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+ }
+ // verify fetchBlockByteRange() still works (forced to use cached tokens)
+ assertTrue(checkFile2(in3));
+
+ /*
+ * testing that when namenode is restarted, cached tokens should still
+ * work and there is no need to fetch new tokens from namenode. Like the
+ * previous test, this test should also run while namenode is down. The
+ * setup for this test depends on the previous test.
+ */
+
+ // restart the namenode and then shut it down for test
+ cluster.restartNameNode();
+ cluster.shutdownNameNode();
+
+ // verify blockSeekTo() still works (forced to use cached tokens)
+ in1.seek(0);
+ assertTrue(checkFile1(in1));
+ // verify again blockSeekTo() still works (forced to use cached tokens)
+ in2.seekToNewSource(0);
+ assertTrue(checkFile1(in2));
+ // verify fetchBlockByteRange() still works (forced to use cached tokens)
+ assertTrue(checkFile2(in3));
+
+ /*
+ * testing that after both namenode and datanodes got restarted (namenode
+ * first, followed by datanodes), DFSClient can't access DN without
+ * re-fetching tokens and is able to re-fetch tokens transparently. The
+ * setup of this test depends on the previous test.
+ */
+
+ // restore the cluster and restart the datanodes for test
+ cluster.restartNameNode();
+ assertTrue(cluster.restartDataNodes(true));
+ cluster.waitActive();
+ assertEquals(numDataNodes, cluster.getDataNodes().size());
+
+ // shutdown namenode so that DFSClient can't get new tokens from namenode
+ cluster.shutdownNameNode();
+
+ // verify blockSeekTo() fails (cached tokens become invalid)
+ in1.seek(0);
+ assertFalse(checkFile1(in1));
+ // verify fetchBlockByteRange() fails (cached tokens become invalid)
+ assertFalse(checkFile2(in3));
+
+ // restart the namenode to allow DFSClient to re-fetch tokens
+ cluster.restartNameNode();
+ // verify blockSeekTo() works again (by transparently re-fetching
+ // tokens from namenode)
+ in1.seek(0);
+ assertTrue(checkFile1(in1));
+ in2.seekToNewSource(0);
+ assertTrue(checkFile1(in2));
+ // verify fetchBlockByteRange() works again (by transparently
+ // re-fetching tokens from namenode)
+ assertTrue(checkFile2(in3));
+
+ /*
+ * testing that when datanodes are restarted on different ports, DFSClient
+ * is able to re-fetch tokens transparently to connect to them
+ */
+
+ // restart datanodes on newly assigned ports
+ assertTrue(cluster.restartDataNodes(false));
+ cluster.waitActive();
+ assertEquals(numDataNodes, cluster.getDataNodes().size());
+ // verify blockSeekTo() is able to re-fetch token transparently
+ in1.seek(0);
+ assertTrue(checkFile1(in1));
+ // verify blockSeekTo() is able to re-fetch token transparently
+ in2.seekToNewSource(0);
+ assertTrue(checkFile1(in2));
+ // verify fetchBlockByteRange() is able to re-fetch token transparently
+ assertTrue(checkFile2(in3));
+
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /*
+ * Integration testing of access token, involving NN, DN, and Balancer
+ */
+ public void testEnd2End() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
+ new TestBalancer().integrationTest(conf);
+ }
+}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java Wed Aug 12 17:30:27 2009
@@ -99,7 +99,7 @@
// check if excessive replica is detected
do {
num = namesystem.blockManager.countNodes(block);
- } while (num.excessReplicas() == 2);
+ } while (num.excessReplicas() != 2);
} finally {
cluster.shutdown();
}