You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/06/06 02:18:04 UTC
svn commit: r1346682 [3/9] - in
/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/
hadoop-hdfs/ hadoop-hdfs/dev-support/ hadoop-hdfs/src/contrib/bkjournal/
ha...
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Wed Jun 6 00:17:38 2012
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -25,6 +27,7 @@ import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -49,13 +52,19 @@ import com.google.common.annotations.Vis
*/
@InterfaceAudience.Private
public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
+ private static final String enableDebugLogging =
+ "For more information, please enable DEBUG log level on "
+ + ((Log4JLogger)LOG).getLogger().getName();
+
private boolean considerLoad;
private boolean preferLocalNode = true;
private NetworkTopology clusterMap;
private FSClusterStats stats;
- static final String enableDebugLogging = "For more information, please enable"
- + " DEBUG level logging on the "
- + "org.apache.hadoop.hdfs.server.namenode.FSNamesystem logger.";
+ private long heartbeatInterval; // interval for DataNode heartbeats
+ /**
+ * A miss of that many heartbeats is tolerated for replica deletion policy.
+ */
+ private int tolerateHeartbeatMultiplier;
BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
@@ -71,6 +80,12 @@ public class BlockPlacementPolicyDefault
this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
this.stats = stats;
this.clusterMap = clusterMap;
+ this.heartbeatInterval = conf.getLong(
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;
+ this.tolerateHeartbeatMultiplier = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
+ DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
}
private ThreadLocal<StringBuilder> threadLocalBuilder =
@@ -551,24 +566,33 @@ public class BlockPlacementPolicyDefault
short replicationFactor,
Collection<DatanodeDescriptor> first,
Collection<DatanodeDescriptor> second) {
+ long oldestHeartbeat =
+ now() - heartbeatInterval * tolerateHeartbeatMultiplier;
+ DatanodeDescriptor oldestHeartbeatNode = null;
long minSpace = Long.MAX_VALUE;
- DatanodeDescriptor cur = null;
+ DatanodeDescriptor minSpaceNode = null;
// pick replica from the first Set. If first is empty, then pick replicas
// from second set.
Iterator<DatanodeDescriptor> iter =
first.isEmpty() ? second.iterator() : first.iterator();
- // pick node with least free space
+ // Pick the node with the oldest heartbeat or with the least free space,
+ // if all hearbeats are within the tolerable heartbeat interval
while (iter.hasNext() ) {
DatanodeDescriptor node = iter.next();
long free = node.getRemaining();
+ long lastHeartbeat = node.getLastUpdate();
+ if(lastHeartbeat < oldestHeartbeat) {
+ oldestHeartbeat = lastHeartbeat;
+ oldestHeartbeatNode = node;
+ }
if (minSpace > free) {
minSpace = free;
- cur = node;
+ minSpaceNode = node;
}
}
- return cur;
+ return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
}
@VisibleForTesting
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Jun 6 00:17:38 2012
@@ -100,11 +100,7 @@ public class DatanodeManager {
* with the same storage id; and </li>
* <li>removed if and only if an existing datanode is restarted to serve a
* different storage id.</li>
- * </ul> <br>
- * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
- * in the namespace image file. Only the {@link DatanodeInfo} part is
- * persistent, the list of blocks is restored from the datanode block
- * reports.
+ * </ul> <br>
* <p>
* Mapping: StorageID -> DatanodeDescriptor
*/
@@ -832,7 +828,9 @@ public class DatanodeManager {
if (InetAddresses.isInetAddress(hostStr)) {
// The IP:port is sufficient for listing in a report
- dnId = new DatanodeID(hostStr, "", port);
+ dnId = new DatanodeID(hostStr, "", "", port,
+ DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
+ DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
} else {
String ipAddr = "";
try {
@@ -840,7 +838,9 @@ public class DatanodeManager {
} catch (UnknownHostException e) {
LOG.warn("Invalid hostname " + hostStr + " in hosts file");
}
- dnId = new DatanodeID(ipAddr, hostStr, port);
+ dnId = new DatanodeID(ipAddr, hostStr, "", port,
+ DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
+ DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
}
return dnId;
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Wed Jun 6 00:17:38 2012
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.bl
import java.io.PrintWriter;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -54,10 +53,23 @@ class InvalidateBlocks {
return numBlocks;
}
- /** Does this contain the block which is associated with the storage? */
+ /**
+ * @return true if the given storage has the given block listed for
+ * invalidation. Blocks are compared including their generation stamps:
+ * if a block is pending invalidation but with a different generation stamp,
+ * returns false.
+ * @param storageID the storage to check
+ * @param the block to look for
+ *
+ */
synchronized boolean contains(final String storageID, final Block block) {
- final Collection<Block> s = node2blocks.get(storageID);
- return s != null && s.contains(block);
+ final LightWeightHashSet<Block> s = node2blocks.get(storageID);
+ if (s == null) {
+ return false; // no invalidate blocks for this storage ID
+ }
+ Block blockInSet = s.getElement(block);
+ return blockInSet != null &&
+ block.getGenerationStamp() == blockInSet.getGenerationStamp();
}
/**
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java Wed Jun 6 00:17:38 2012
@@ -19,26 +19,20 @@ package org.apache.hadoop.hdfs.server.bl
import java.io.IOException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.fs.ContentSummary;
-
/**
* This interface is used by the block manager to expose a
* few characteristics of a collection of Block/BlockUnderConstruction.
*/
public interface MutableBlockCollection extends BlockCollection {
/**
- * Set block
+ * Set the block at the given index.
*/
- public void setBlock(int idx, BlockInfo blk);
+ public void setBlock(int index, BlockInfo blk);
/**
- * Convert the last block of the collection to an under-construction block.
- * Set its locations.
+ * Convert the last block of the collection to an under-construction block
+ * and set the locations.
*/
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
- DatanodeDescriptor[] targets) throws IOException;
+ DatanodeDescriptor[] locations) throws IOException;
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Wed Jun 6 00:17:38 2012
@@ -71,10 +71,12 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+
@InterfaceAudience.Private
public class JspHelper {
public static final String CURRENT_CONF = "current.conf";
- final static public String WEB_UGI_PROPERTY_NAME = DFSConfigKeys.DFS_WEB_UGI_KEY;
public static final String DELEGATION_PARAMETER_NAME = DelegationParam.NAME;
public static final String NAMENODE_ADDRESS = "nnaddr";
static final String SET_DELEGATION = "&" + DELEGATION_PARAMETER_NAME +
@@ -438,9 +440,9 @@ public class JspHelper {
/** Return a table containing version information. */
public static String getVersionTable() {
- return "<div id='dfstable'><table>"
- + "\n <tr><td id='col1'>Version:</td><td>" + VersionInfo.getVersion() + ", " + VersionInfo.getRevision()
- + "\n <tr><td id='col1'>Compiled:</td><td>" + VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " + VersionInfo.getBranch()
+ return "<div class='dfstable'><table>"
+ + "\n <tr><td class='col1'>Version:</td><td>" + VersionInfo.getVersion() + ", " + VersionInfo.getRevision() + "</td></tr>"
+ + "\n <tr><td class='col1'>Compiled:</td><td>" + VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " + VersionInfo.getBranch() + "</td></tr>"
+ "\n</table></div>";
}
@@ -483,11 +485,12 @@ public class JspHelper {
*/
public static UserGroupInformation getDefaultWebUser(Configuration conf
) throws IOException {
- String[] strings = conf.getStrings(JspHelper.WEB_UGI_PROPERTY_NAME);
- if (strings == null || strings.length == 0) {
+ String user = conf.get(
+ HADOOP_HTTP_STATIC_USER, DEFAULT_HADOOP_HTTP_STATIC_USER);
+ if (user == null || user.length() == 0) {
throw new IOException("Cannot determine UGI from request or conf");
}
- return UserGroupInformation.createRemoteUser(strings[0]);
+ return UserGroupInformation.createRemoteUser(user);
}
private static InetSocketAddress getNNServiceAddress(ServletContext context,
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Wed Jun 6 00:17:38 2012
@@ -591,7 +591,8 @@ class BPOfferService {
processDistributedUpgradeCommand((UpgradeCommand)cmd);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
- dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
+ String who = "NameNode at " + actor.getNNSocketAddress();
+ dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
@@ -608,6 +609,9 @@ class BPOfferService {
if (bandwidth > 0) {
DataXceiverServer dxcs =
(DataXceiverServer) dn.dataXceiverServer.getRunnable();
+ LOG.info("Updating balance throttler bandwidth from "
+ + dxcs.balanceThrottler.getBandwidth() + " bytes/s "
+ + "to: " + bandwidth + " bytes/s.");
dxcs.balanceThrottler.setBandwidth(bandwidth);
}
break;
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java Wed Jun 6 00:17:38 2012
@@ -145,7 +145,7 @@ class BlockPoolManager {
void refreshNamenodes(Configuration conf)
throws IOException {
LOG.info("Refresh request received for nameservices: "
- + conf.get(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES));
+ + conf.get(DFSConfigKeys.DFS_NAMESERVICES));
Map<String, Map<String, InetSocketAddress>> newAddressMap =
DFSUtil.getNNServiceRpcAddresses(conf);
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Jun 6 00:17:38 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -110,6 +111,8 @@ class BlockReceiver implements Closeable
private final BlockConstructionStage stage;
private final boolean isTransfer;
+ private boolean syncOnClose;
+
BlockReceiver(final ExtendedBlock block, final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
@@ -245,14 +248,18 @@ class BlockReceiver implements Closeable
* close files.
*/
public void close() throws IOException {
-
IOException ioe = null;
+ if (syncOnClose && (out != null || checksumOut != null)) {
+ datanode.metrics.incrFsyncCount();
+ }
// close checksum file
try {
if (checksumOut != null) {
checksumOut.flush();
- if (datanode.getDnConf().syncOnClose && (cout instanceof FileOutputStream)) {
+ if (syncOnClose && (cout instanceof FileOutputStream)) {
+ long start = Util.now();
((FileOutputStream)cout).getChannel().force(true);
+ datanode.metrics.addFsync(Util.now() - start);
}
checksumOut.close();
checksumOut = null;
@@ -267,8 +274,10 @@ class BlockReceiver implements Closeable
try {
if (out != null) {
out.flush();
- if (datanode.getDnConf().syncOnClose && (out instanceof FileOutputStream)) {
+ if (syncOnClose && (out instanceof FileOutputStream)) {
+ long start = Util.now();
((FileOutputStream)out).getChannel().force(true);
+ datanode.metrics.addFsync(Util.now() - start);
}
out.close();
out = null;
@@ -290,12 +299,25 @@ class BlockReceiver implements Closeable
* Flush block data and metadata files to disk.
* @throws IOException
*/
- void flush() throws IOException {
+ void flushOrSync(boolean isSync) throws IOException {
+ if (isSync && (out != null || checksumOut != null)) {
+ datanode.metrics.incrFsyncCount();
+ }
if (checksumOut != null) {
checksumOut.flush();
+ if (isSync && (cout instanceof FileOutputStream)) {
+ long start = Util.now();
+ ((FileOutputStream)cout).getChannel().force(true);
+ datanode.metrics.addFsync(Util.now() - start);
+ }
}
if (out != null) {
out.flush();
+ if (isSync && (out instanceof FileOutputStream)) {
+ long start = Util.now();
+ ((FileOutputStream)out).getChannel().force(true);
+ datanode.metrics.addFsync(Util.now() - start);
+ }
}
}
@@ -533,7 +555,9 @@ class BlockReceiver implements Closeable
header.getOffsetInBlock(),
header.getSeqno(),
header.isLastPacketInBlock(),
- header.getDataLen(), endOfHeader);
+ header.getDataLen(),
+ header.getSyncBlock(),
+ endOfHeader);
}
/**
@@ -549,15 +573,19 @@ class BlockReceiver implements Closeable
* returns the number of data bytes that the packet has.
*/
private int receivePacket(long offsetInBlock, long seqno,
- boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
+ boolean lastPacketInBlock, int len, boolean syncBlock,
+ int endOfHeader) throws IOException {
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
" of length " + len +
" seqno " + seqno +
" offsetInBlock " + offsetInBlock +
+ " syncBlock " + syncBlock +
" lastPacketInBlock " + lastPacketInBlock);
}
-
+ // make sure the block gets sync'ed upon close
+ this.syncOnClose |= syncBlock && lastPacketInBlock;
+
// update received bytes
long firstByteInBlock = offsetInBlock;
offsetInBlock += len;
@@ -587,6 +615,10 @@ class BlockReceiver implements Closeable
if(LOG.isDebugEnabled()) {
LOG.debug("Receiving an empty packet or the end of the block " + block);
}
+ // flush unless close() would flush anyway
+ if (syncBlock && !lastPacketInBlock) {
+ flushOrSync(true);
+ }
} else {
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
@@ -677,8 +709,8 @@ class BlockReceiver implements Closeable
);
checksumOut.write(pktBuf, checksumOff, checksumLen);
}
- /// flush entire packet
- flush();
+ /// flush entire packet, sync unless close() will sync
+ flushOrSync(syncBlock && !lastPacketInBlock);
replicaInfo.setLastChecksumAndDataLen(
offsetInBlock, lastChunkChecksum
@@ -730,6 +762,7 @@ class BlockReceiver implements Closeable
String mirrAddr, DataTransferThrottler throttlerArg,
DatanodeInfo[] downstreams) throws IOException {
+ syncOnClose = datanode.getDnConf().syncOnClose;
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
@@ -768,7 +801,7 @@ class BlockReceiver implements Closeable
datanode.data.convertTemporaryToRbw(block);
} else {
// for isDatnode or TRANSFER_FINALIZED
- // Finalize the block. Does this fsync()?
+ // Finalize the block.
datanode.data.finalizeBlock(block);
}
datanode.metrics.incrBlocksWritten();
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Wed Jun 6 00:17:38 2012
@@ -701,8 +701,9 @@ class BlockSender implements java.io.Clo
*/
private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
pkt.clear();
+ // both syncBlock and syncPacket are false
PacketHeader header = new PacketHeader(packetLen, offset, seqno,
- (dataLen == 0), dataLen);
+ (dataLen == 0), dataLen, false);
header.putInBuffer(pkt);
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Jun 6 00:17:38 2012
@@ -163,6 +163,7 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
@@ -667,23 +668,16 @@ public class DataNode extends Configured
* @param nsInfo the namespace info from the first part of the NN handshake
*/
DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) {
- final String xferIp = streamingAddr.getAddress().getHostAddress();
- DatanodeRegistration bpRegistration = new DatanodeRegistration(xferIp, getXferPort());
- bpRegistration.setInfoPort(getInfoPort());
- bpRegistration.setIpcPort(getIpcPort());
- bpRegistration.setHostName(hostName);
- bpRegistration.setStorageID(getStorageId());
- bpRegistration.setSoftwareVersion(VersionInfo.getVersion());
-
StorageInfo storageInfo = storage.getBPStorage(nsInfo.getBlockPoolID());
if (storageInfo == null) {
// it's null in the case of SimulatedDataSet
- bpRegistration.getStorageInfo().layoutVersion = HdfsConstants.LAYOUT_VERSION;
- bpRegistration.setStorageInfo(nsInfo);
- } else {
- bpRegistration.setStorageInfo(storageInfo);
+ storageInfo = new StorageInfo(nsInfo);
}
- return bpRegistration;
+ DatanodeID dnId = new DatanodeID(
+ streamingAddr.getAddress().getHostAddress(), hostName,
+ getStorageId(), getXferPort(), getInfoPort(), getIpcPort());
+ return new DatanodeRegistration(dnId, storageInfo,
+ new ExportedBlockKeys(), VersionInfo.getVersion());
}
/**
@@ -1713,13 +1707,16 @@ public class DataNode extends Configured
secureMain(args, null);
}
- public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
+ public Daemon recoverBlocks(
+ final String who,
+ final Collection<RecoveringBlock> blocks) {
+
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
for(RecoveringBlock b : blocks) {
try {
- logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
+ logRecoverBlock(who, b);
recoverBlock(b);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED: " + b, e);
@@ -1980,14 +1977,13 @@ public class DataNode extends Configured
datanodes, storages);
}
- private static void logRecoverBlock(String who,
- ExtendedBlock block, DatanodeID[] targets) {
- StringBuilder msg = new StringBuilder(targets[0].toString());
- for (int i = 1; i < targets.length; i++) {
- msg.append(", " + targets[i]);
- }
+ private static void logRecoverBlock(String who, RecoveringBlock rb) {
+ ExtendedBlock block = rb.getBlock();
+ DatanodeInfo[] targets = rb.getLocations();
+
LOG.info(who + " calls recoverBlock(block=" + block
- + ", targets=[" + msg + "])");
+ + ", targets=[" + Joiner.on(", ").join(targets) + "]"
+ + ", newGenerationStamp=" + rb.getNewGenerationStamp() + ")");
}
@Override // ClientDataNodeProtocol
@@ -2032,6 +2028,18 @@ public class DataNode extends Configured
//get replica information
synchronized(data) {
+ Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
+ b.getBlockId());
+ if (null == storedBlock) {
+ throw new IOException(b + " not found in datanode.");
+ }
+ storedGS = storedBlock.getGenerationStamp();
+ if (storedGS < b.getGenerationStamp()) {
+ throw new IOException(storedGS
+ + " = storedGS < b.getGenerationStamp(), b=" + b);
+ }
+ // Update the genstamp with storedGS
+ b.setGenerationStamp(storedGS);
if (data.isValidRbw(b)) {
stage = BlockConstructionStage.TRANSFER_RBW;
} else if (data.isValidBlock(b)) {
@@ -2040,18 +2048,9 @@ public class DataNode extends Configured
final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
}
-
- storedGS = data.getStoredBlock(b.getBlockPoolId(),
- b.getBlockId()).getGenerationStamp();
- if (storedGS < b.getGenerationStamp()) {
- throw new IOException(
- storedGS + " = storedGS < b.getGenerationStamp(), b=" + b);
- }
visible = data.getReplicaVisibleLength(b);
}
-
- //set storedGS and visible length
- b.setGenerationStamp(storedGS);
+ //set visible length
b.setNumBytes(visible);
if (targets.length > 0) {
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Wed Jun 6 00:17:38 2012
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.URL;
import java.net.URLEncoder;
import java.security.PrivilegedExceptionAction;
import java.text.SimpleDateFormat;
@@ -616,9 +617,12 @@ public class DatanodeJspHelper {
Configuration conf
) throws IOException,
InterruptedException {
- final String referrer = JspHelper.validateURL(req.getParameter("referrer"));
+ String referrer = null;
boolean noLink = false;
- if (referrer == null) {
+ try {
+ referrer = new URL(req.getParameter("referrer")).toString();
+ } catch (IOException e) {
+ referrer = null;
noLink = true;
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java Wed Jun 6 00:17:38 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.security.UserGroupInformation;
import org.mortbay.jetty.nio.SelectChannelConnector;
/**
@@ -60,10 +61,7 @@ public class SecureDataNodeStarter imple
@Override
public void init(DaemonContext context) throws Exception {
System.err.println("Initializing secure datanode resources");
- // We should only start up a secure datanode in a Kerberos-secured cluster
- Configuration conf = new Configuration(); // Skip UGI method to not log in
- if(!conf.get(HADOOP_SECURITY_AUTHENTICATION).equals("kerberos"))
- throw new RuntimeException("Cannot start secure datanode in unsecure cluster");
+ Configuration conf = new Configuration();
// Stash command-line arguments for regular datanode
args = context.getArguments();
@@ -98,7 +96,8 @@ public class SecureDataNodeStarter imple
System.err.println("Successfully obtained privileged resources (streaming port = "
+ ss + " ) (http listener port = " + listener.getConnection() +")");
- if (ss.getLocalPort() >= 1023 || listener.getPort() >= 1023) {
+ if ((ss.getLocalPort() >= 1023 || listener.getPort() >= 1023) &&
+ UserGroupInformation.isSecurityEnabled()) {
throw new RuntimeException("Cannot start secure datanode with unprivileged ports");
}
System.err.println("Opened streaming server at " + streamingAddr);
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Wed Jun 6 00:17:38 2012
@@ -61,6 +61,8 @@ public class DataNodeMetrics {
@Metric MutableCounterLong writesFromLocalClient;
@Metric MutableCounterLong writesFromRemoteClient;
@Metric MutableCounterLong blocksGetLocalPathInfo;
+
+ @Metric MutableCounterLong fsyncCount;
@Metric MutableCounterLong volumeFailures;
@@ -72,6 +74,8 @@ public class DataNodeMetrics {
@Metric MutableRate heartbeats;
@Metric MutableRate blockReports;
+ @Metric MutableRate fsync;
+
final MetricsRegistry registry = new MetricsRegistry("datanode");
final String name;
@@ -151,6 +155,14 @@ public class DataNodeMetrics {
blocksRead.incr();
}
+ public void incrFsyncCount() {
+ fsyncCount.incr();
+ }
+
+ public void addFsync(long latency) {
+ fsync.add(latency);
+ }
+
public void shutdown() {
DefaultMetricsSystem.shutdown();
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Wed Jun 6 00:17:38 2012
@@ -207,7 +207,7 @@ public class BackupImage extends FSImage
int logVersion = storage.getLayoutVersion();
backupInputStream.setBytes(data, logVersion);
- long numTxnsAdvanced = logLoader.loadEditRecords(logVersion,
+ long numTxnsAdvanced = logLoader.loadEditRecords(
backupInputStream, true, lastAppliedTxId + 1, null);
if (numTxnsAdvanced != numTxns) {
throw new IOException("Batch of txns starting at txnid " +
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Wed Jun 6 00:17:38 2012
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
+import java.util.Collection;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -60,19 +61,10 @@ class BackupJournalManager implements Jo
}
@Override
- public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
- throws IOException, CorruptionException {
+ public void selectInputStreams(Collection<EditLogInputStream> streams,
+ long fromTxnId, boolean inProgressOk) {
// This JournalManager is never used for input. Therefore it cannot
// return any transactions
- return 0;
- }
-
- @Override
- public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
- throws IOException {
- // This JournalManager is never used for input. Therefore it cannot
- // return any transactions
- throw new IOException("Unsupported operation");
}
@Override
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Wed Jun 6 00:17:38 2012
@@ -119,7 +119,7 @@ class EditLogBackupInputStream extends E
this.version = version;
- reader = new FSEditLogOp.Reader(in, version);
+ reader = new FSEditLogOp.Reader(in, tracker, version);
}
void clear() throws IOException {
@@ -129,12 +129,12 @@ class EditLogBackupInputStream extends E
}
@Override
- public long getFirstTxId() throws IOException {
+ public long getFirstTxId() {
return HdfsConstants.INVALID_TXID;
}
@Override
- public long getLastTxId() throws IOException {
+ public long getLastTxId() {
return HdfsConstants.INVALID_TXID;
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Wed Jun 6 00:17:38 2012
@@ -24,10 +24,14 @@ import java.io.IOException;
import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.DataInputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.io.IOUtils;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
/**
* An implementation of the abstract class {@link EditLogInputStream}, which
@@ -35,13 +39,21 @@ import com.google.common.annotations.Vis
*/
public class EditLogFileInputStream extends EditLogInputStream {
private final File file;
- private final FileInputStream fStream;
- final private long firstTxId;
- final private long lastTxId;
- private final int logVersion;
- private final FSEditLogOp.Reader reader;
- private final FSEditLogLoader.PositionTrackingInputStream tracker;
+ private final long firstTxId;
+ private final long lastTxId;
private final boolean isInProgress;
+ static private enum State {
+ UNINIT,
+ OPEN,
+ CLOSED
+ }
+ private State state = State.UNINIT;
+ private FileInputStream fStream = null;
+ private int logVersion = 0;
+ private FSEditLogOp.Reader reader = null;
+ private FSEditLogLoader.PositionTrackingInputStream tracker = null;
+ private DataInputStream dataIn = null;
+ static final Log LOG = LogFactory.getLog(EditLogInputStream.class);
/**
* Open an EditLogInputStream for the given file.
@@ -68,34 +80,43 @@ public class EditLogFileInputStream exte
* header
*/
public EditLogFileInputStream(File name, long firstTxId, long lastTxId,
- boolean isInProgress)
- throws LogHeaderCorruptException, IOException {
- file = name;
- fStream = new FileInputStream(name);
-
- BufferedInputStream bin = new BufferedInputStream(fStream);
- tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
- DataInputStream in = new DataInputStream(tracker);
-
- try {
- logVersion = readLogVersion(in);
- } catch (EOFException eofe) {
- throw new LogHeaderCorruptException("No header found in log");
- }
-
- reader = new FSEditLogOp.Reader(in, logVersion);
+ boolean isInProgress) {
+ this.file = name;
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.isInProgress = isInProgress;
}
+ private void init() throws LogHeaderCorruptException, IOException {
+ Preconditions.checkState(state == State.UNINIT);
+ BufferedInputStream bin = null;
+ try {
+ fStream = new FileInputStream(file);
+ bin = new BufferedInputStream(fStream);
+ tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
+ dataIn = new DataInputStream(tracker);
+ try {
+ logVersion = readLogVersion(dataIn);
+ } catch (EOFException eofe) {
+ throw new LogHeaderCorruptException("No header found in log");
+ }
+ reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
+ state = State.OPEN;
+ } finally {
+ if (reader == null) {
+ IOUtils.cleanup(LOG, dataIn, tracker, bin, fStream);
+ state = State.CLOSED;
+ }
+ }
+ }
+
@Override
- public long getFirstTxId() throws IOException {
+ public long getFirstTxId() {
return firstTxId;
}
@Override
- public long getLastTxId() throws IOException {
+ public long getLastTxId() {
return lastTxId;
}
@@ -104,33 +125,95 @@ public class EditLogFileInputStream exte
return file.getPath();
}
+ private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
+ FSEditLogOp op = null;
+ switch (state) {
+ case UNINIT:
+ try {
+ init();
+ } catch (Throwable e) {
+ LOG.error("caught exception initializing " + this, e);
+ if (skipBrokenEdits) {
+ return null;
+ }
+ Throwables.propagateIfPossible(e, IOException.class);
+ }
+ Preconditions.checkState(state != State.UNINIT);
+ return nextOpImpl(skipBrokenEdits);
+ case OPEN:
+ op = reader.readOp(skipBrokenEdits);
+ if ((op != null) && (op.hasTransactionId())) {
+ long txId = op.getTransactionId();
+ if ((txId >= lastTxId) &&
+ (lastTxId != HdfsConstants.INVALID_TXID)) {
+ //
+ // Sometimes, the NameNode crashes while it's writing to the
+ // edit log. In that case, you can end up with an unfinalized edit log
+ // which has some garbage at the end.
+ // JournalManager#recoverUnfinalizedSegments will finalize these
+ // unfinished edit logs, giving them a defined final transaction
+ // ID. Then they will be renamed, so that any subsequent
+ // readers will have this information.
+ //
+ // Since there may be garbage at the end of these "cleaned up"
+ // logs, we want to be sure to skip it here if we've read everything
+ // we were supposed to read out of the stream.
+ // So we force an EOF on all subsequent reads.
+ //
+ long skipAmt = file.length() - tracker.getPos();
+ if (skipAmt > 0) {
+ LOG.warn("skipping " + skipAmt + " bytes at the end " +
+ "of edit log '" + getName() + "': reached txid " + txId +
+ " out of " + lastTxId);
+ tracker.skip(skipAmt);
+ }
+ }
+ }
+ break;
+ case CLOSED:
+ break; // return null
+ }
+ return op;
+ }
+
@Override
protected FSEditLogOp nextOp() throws IOException {
- return reader.readOp(false);
+ return nextOpImpl(false);
}
-
+
@Override
protected FSEditLogOp nextValidOp() {
try {
- return reader.readOp(true);
- } catch (IOException e) {
+ return nextOpImpl(true);
+ } catch (Throwable e) {
+ LOG.error("nextValidOp: got exception while reading " + this, e);
return null;
}
}
@Override
public int getVersion() throws IOException {
+ if (state == State.UNINIT) {
+ init();
+ }
return logVersion;
}
@Override
public long getPosition() {
- return tracker.getPos();
+ if (state == State.OPEN) {
+ return tracker.getPos();
+ } else {
+ return 0;
+ }
}
@Override
public void close() throws IOException {
- fStream.close();
+ if (state == State.OPEN) {
+ dataIn.close();
+ }
+ state = State.CLOSED;
}
@Override
@@ -153,12 +236,12 @@ public class EditLogFileInputStream exte
EditLogFileInputStream in;
try {
in = new EditLogFileInputStream(file);
- } catch (LogHeaderCorruptException corrupt) {
+ in.getVersion(); // causes us to read the header
+ } catch (LogHeaderCorruptException e) {
// If the header is malformed or the wrong value, this indicates a corruption
- FSImage.LOG.warn("Log at " + file + " has no valid header",
- corrupt);
+ LOG.warn("Log file " + file + " has no valid header", e);
return new FSEditLogLoader.EditLogValidation(0,
- HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, true);
+ HdfsConstants.INVALID_TXID, true);
}
try {
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Wed Jun 6 00:17:38 2012
@@ -41,12 +41,13 @@ import com.google.common.annotations.Vis
@InterfaceAudience.Private
public class EditLogFileOutputStream extends EditLogOutputStream {
private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
+ public static final int PREALLOCATION_LENGTH = 1024 * 1024;
private File file;
private FileOutputStream fp; // file stream for storing edit logs
private FileChannel fc; // channel of the file stream for sync
private EditsDoubleBuffer doubleBuf;
- static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
+ static ByteBuffer fill = ByteBuffer.allocateDirect(PREALLOCATION_LENGTH);
static {
fill.position(0);
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Wed Jun 6 00:17:38 2012
@@ -45,12 +45,12 @@ public abstract class EditLogInputStream
/**
* @return the first transaction which will be found in this stream
*/
- public abstract long getFirstTxId() throws IOException;
+ public abstract long getFirstTxId();
/**
* @return the last transaction which will be found in this stream
*/
- public abstract long getLastTxId() throws IOException;
+ public abstract long getLastTxId();
/**
@@ -73,14 +73,14 @@ public abstract class EditLogInputStream
}
return nextOp();
}
-
+
/**
* Position the stream so that a valid operation can be read from it with
* readOp().
*
* This method can be used to skip over corrupted sections of edit logs.
*/
- public void resync() throws IOException {
+ public void resync() {
if (cachedOp != null) {
return;
}
@@ -109,7 +109,7 @@ public abstract class EditLogInputStream
// error recovery will want to override this.
try {
return nextOp();
- } catch (IOException e) {
+ } catch (Throwable e) {
return null;
}
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Jun 6 00:17:38 2012
@@ -25,6 +25,7 @@ import java.lang.reflect.Constructor;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -232,6 +233,10 @@ public class FSEditLog {
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
journalSet = new JournalSet(minimumRedundantJournals);
+ // set runtime so we can test starting with a faulty or unavailable
+ // shared directory
+ this.journalSet.setRuntimeForTesting(runtime);
+
for (URI u : dirs) {
boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
.contains(u);
@@ -269,13 +274,14 @@ public class FSEditLog {
long segmentTxId = getLastWrittenTxId() + 1;
// Safety check: we should never start a segment if there are
// newer txids readable.
- EditLogInputStream s = journalSet.getInputStream(segmentTxId, true);
- try {
- Preconditions.checkState(s == null,
- "Cannot start writing at txid %s when there is a stream " +
- "available for read: %s", segmentTxId, s);
- } finally {
- IOUtils.closeStream(s);
+ List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
+ journalSet.selectInputStreams(streams, segmentTxId, true);
+ if (!streams.isEmpty()) {
+ String error = String.format("Cannot start writing at txid %s " +
+ "when there is a stream available for read: %s",
+ segmentTxId, streams.get(0));
+ IOUtils.cleanup(LOG, streams.toArray(new EditLogInputStream[0]));
+ throw new IllegalStateException(error);
}
startLogSegmentAndWriteHeaderTxn(segmentTxId);
@@ -895,7 +901,7 @@ public class FSEditLog {
* Used only by unit tests.
*/
@VisibleForTesting
- synchronized void setRuntimeForTesting(Runtime runtime) {
+ synchronized public void setRuntimeForTesting(Runtime runtime) {
this.runtime = runtime;
this.journalSet.setRuntimeForTesting(runtime);
}
@@ -1199,10 +1205,10 @@ public class FSEditLog {
// All journals have failed, it is handled in logSync.
}
}
-
- Collection<EditLogInputStream> selectInputStreams(long fromTxId,
- long toAtLeastTxId) throws IOException {
- return selectInputStreams(fromTxId, toAtLeastTxId, true);
+
+ public Collection<EditLogInputStream> selectInputStreams(
+ long fromTxId, long toAtLeastTxId) throws IOException {
+ return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
}
/**
@@ -1212,25 +1218,71 @@ public class FSEditLog {
* @param toAtLeast the selected streams must contain this transaction
* @param inProgessOk set to true if in-progress streams are OK
*/
- public synchronized Collection<EditLogInputStream> selectInputStreams(long fromTxId,
- long toAtLeastTxId, boolean inProgressOk) throws IOException {
+ public synchronized Collection<EditLogInputStream> selectInputStreams(
+ long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
+ boolean inProgressOk) throws IOException {
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
- EditLogInputStream stream = journalSet.getInputStream(fromTxId, inProgressOk);
- while (stream != null) {
- streams.add(stream);
- // We're now looking for a higher range, so reset the fromTxId
- fromTxId = stream.getLastTxId() + 1;
- stream = journalSet.getInputStream(fromTxId, inProgressOk);
+ journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
+
+ try {
+ checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
+ } catch (IOException e) {
+ if (recovery != null) {
+ // If recovery mode is enabled, continue loading even if we know we
+ // can't load up to toAtLeastTxId.
+ LOG.error(e);
+ } else {
+ closeAllStreams(streams);
+ throw e;
+ }
}
-
- if (fromTxId <= toAtLeastTxId) {
- closeAllStreams(streams);
- throw new IOException(String.format("Gap in transactions. Expected to "
- + "be able to read up until at least txid %d but unable to find any "
- + "edit logs containing txid %d", toAtLeastTxId, fromTxId));
+ // This code will go away as soon as RedundantEditLogInputStream is
+ // introduced. (HDFS-3049)
+ try {
+ if (!streams.isEmpty()) {
+ streams.get(0).skipUntil(fromTxId);
+ }
+ } catch (IOException e) {
+ // We don't want to throw an exception from here, because that would make
+ // recovery impossible even if the user requested it. An exception will
+ // be thrown later, when we don't read the starting txid we expect.
+ LOG.error("error skipping until transaction " + fromTxId, e);
}
return streams;
}
+
+ /**
+ * Check for gaps in the edit log input stream list.
+ * Note: we're assuming that the list is sorted and that txid ranges don't
+ * overlap. This could be done better and with more generality with an
+ * interval tree.
+ */
+ private void checkForGaps(List<EditLogInputStream> streams, long fromTxId,
+ long toAtLeastTxId, boolean inProgressOk) throws IOException {
+ Iterator<EditLogInputStream> iter = streams.iterator();
+ long txId = fromTxId;
+ while (true) {
+ if (txId > toAtLeastTxId) return;
+ if (!iter.hasNext()) break;
+ EditLogInputStream elis = iter.next();
+ if (elis.getFirstTxId() > txId) break;
+ long next = elis.getLastTxId();
+ if (next == HdfsConstants.INVALID_TXID) {
+ if (!inProgressOk) {
+ throw new RuntimeException("inProgressOk = false, but " +
+ "selectInputStreams returned an in-progress edit " +
+ "log input stream (" + elis + ")");
+ }
+ // We don't know where the in-progress stream ends.
+ // It could certainly go all the way up to toAtLeastTxId.
+ return;
+ }
+ txId = next + 1;
+ }
+ throw new IOException(String.format("Gap in transactions. Expected to "
+ + "be able to read up until at least txid %d but unable to find any "
+ + "edit logs containing txid %d", toAtLeastTxId, txId));
+ }
/**
* Close all the streams in a collection
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Wed Jun 6 00:17:38 2012
@@ -85,12 +85,10 @@ public class FSEditLogLoader {
*/
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
MetaRecoveryContext recovery) throws IOException {
- int logVersion = edits.getVersion();
-
fsNamesys.writeLock();
try {
long startTime = now();
- long numEdits = loadEditRecords(logVersion, edits, false,
+ long numEdits = loadEditRecords(edits, false,
expectedStartingTxId, recovery);
FSImage.LOG.info("Edits file " + edits.getName()
+ " of size " + edits.length() + " edits # " + numEdits
@@ -102,7 +100,7 @@ public class FSEditLogLoader {
}
}
- long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
+ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
long expectedStartingTxId, MetaRecoveryContext recovery)
throws IOException {
FSDirectory fsDir = fsNamesys.dir;
@@ -141,10 +139,10 @@ public class FSEditLogLoader {
}
} catch (Throwable e) {
// Handle a problem with our input
- check203UpgradeFailure(logVersion, e);
+ check203UpgradeFailure(in.getVersion(), e);
String errorMessage =
formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
- FSImage.LOG.error(errorMessage);
+ FSImage.LOG.error(errorMessage, e);
if (recovery == null) {
// We will only try to skip over problematic opcodes when in
// recovery mode.
@@ -158,7 +156,7 @@ public class FSEditLogLoader {
}
recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
in.getPosition();
- if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
+ if (op.hasTransactionId()) {
if (op.getTransactionId() > expectedTxId) {
MetaRecoveryContext.editLogLoaderPrompt("There appears " +
"to be a gap in the edit log. We expected txid " +
@@ -175,7 +173,7 @@ public class FSEditLogLoader {
}
}
try {
- applyEditLogOp(op, fsDir, logVersion);
+ applyEditLogOp(op, fsDir, in.getVersion());
} catch (Throwable e) {
LOG.error("Encountered exception on operation " + op, e);
MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
@@ -192,7 +190,7 @@ public class FSEditLogLoader {
expectedTxId = lastAppliedTxId = expectedStartingTxId;
}
// log progress
- if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
+ if (op.hasTransactionId()) {
long now = now();
if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
int percent = Math.round((float)lastAppliedTxId / numTxns * 100);
@@ -647,112 +645,119 @@ public class FSEditLogLoader {
}
/**
- * Return the number of valid transactions in the stream. If the stream is
- * truncated during the header, returns a value indicating that there are
- * 0 valid transactions. This reads through the stream but does not close
- * it.
+ * Find the last valid transaction ID in the stream.
+ * If there are invalid or corrupt transactions in the middle of the stream,
+ * validateEditLog will skip over them.
+ * This reads through the stream but does not close it.
+ *
* @throws IOException if the stream cannot be read due to an IO error (eg
* if the log does not exist)
*/
static EditLogValidation validateEditLog(EditLogInputStream in) {
long lastPos = 0;
- long firstTxId = HdfsConstants.INVALID_TXID;
long lastTxId = HdfsConstants.INVALID_TXID;
long numValid = 0;
- try {
- FSEditLogOp op = null;
- while (true) {
- lastPos = in.getPosition();
+ FSEditLogOp op = null;
+ while (true) {
+ lastPos = in.getPosition();
+ try {
if ((op = in.readOp()) == null) {
break;
}
- if (firstTxId == HdfsConstants.INVALID_TXID) {
- firstTxId = op.getTransactionId();
- }
- if (lastTxId == HdfsConstants.INVALID_TXID
- || op.getTransactionId() == lastTxId + 1) {
- lastTxId = op.getTransactionId();
- } else {
- FSImage.LOG.error("Out of order txid found. Found " +
- op.getTransactionId() + ", expected " + (lastTxId + 1));
- break;
- }
- numValid++;
+ } catch (Throwable t) {
+ FSImage.LOG.warn("Caught exception after reading " + numValid +
+ " ops from " + in + " while determining its valid length." +
+ "Position was " + lastPos, t);
+ break;
+ }
+ if (lastTxId == HdfsConstants.INVALID_TXID
+ || op.getTransactionId() > lastTxId) {
+ lastTxId = op.getTransactionId();
}
- } catch (Throwable t) {
- // Catch Throwable and not just IOE, since bad edits may generate
- // NumberFormatExceptions, AssertionErrors, OutOfMemoryErrors, etc.
- FSImage.LOG.debug("Caught exception after reading " + numValid +
- " ops from " + in + " while determining its valid length.", t);
+ numValid++;
}
- return new EditLogValidation(lastPos, firstTxId, lastTxId, false);
+ return new EditLogValidation(lastPos, lastTxId, false);
}
-
+
static class EditLogValidation {
private final long validLength;
- private final long startTxId;
private final long endTxId;
- private final boolean corruptionDetected;
-
- EditLogValidation(long validLength, long startTxId, long endTxId,
- boolean corruptionDetected) {
+ private final boolean hasCorruptHeader;
+
+ EditLogValidation(long validLength, long endTxId,
+ boolean hasCorruptHeader) {
this.validLength = validLength;
- this.startTxId = startTxId;
this.endTxId = endTxId;
- this.corruptionDetected = corruptionDetected;
+ this.hasCorruptHeader = hasCorruptHeader;
}
-
+
long getValidLength() { return validLength; }
-
- long getStartTxId() { return startTxId; }
-
+
long getEndTxId() { return endTxId; }
-
- long getNumTransactions() {
- if (endTxId == HdfsConstants.INVALID_TXID
- || startTxId == HdfsConstants.INVALID_TXID) {
- return 0;
- }
- return (endTxId - startTxId) + 1;
- }
-
- boolean hasCorruptHeader() { return corruptionDetected; }
+
+ boolean hasCorruptHeader() { return hasCorruptHeader; }
}
/**
* Stream wrapper that keeps track of the current stream position.
+ *
+ * This stream also allows us to set a limit on how many bytes we can read
+ * without getting an exception.
*/
- public static class PositionTrackingInputStream extends FilterInputStream {
+ public static class PositionTrackingInputStream extends FilterInputStream
+ implements StreamLimiter {
private long curPos = 0;
private long markPos = -1;
+ private long limitPos = Long.MAX_VALUE;
public PositionTrackingInputStream(InputStream is) {
super(is);
}
+ private void checkLimit(long amt) throws IOException {
+ long extra = (curPos + amt) - limitPos;
+ if (extra > 0) {
+ throw new IOException("Tried to read " + amt + " byte(s) past " +
+ "the limit at offset " + limitPos);
+ }
+ }
+
+ @Override
public int read() throws IOException {
+ checkLimit(1);
int ret = super.read();
if (ret != -1) curPos++;
return ret;
}
+ @Override
public int read(byte[] data) throws IOException {
+ checkLimit(data.length);
int ret = super.read(data);
if (ret > 0) curPos += ret;
return ret;
}
+ @Override
public int read(byte[] data, int offset, int length) throws IOException {
+ checkLimit(length);
int ret = super.read(data, offset, length);
if (ret > 0) curPos += ret;
return ret;
}
+ @Override
+ public void setLimit(long limit) {
+ limitPos = curPos + limit;
+ }
+
+ @Override
public void mark(int limit) {
super.mark(limit);
markPos = curPos;
}
+ @Override
public void reset() throws IOException {
if (markPos == -1) {
throw new IOException("Not marked!");
@@ -765,6 +770,18 @@ public class FSEditLogLoader {
public long getPos() {
return curPos;
}
+
+ @Override
+ public long skip(long amt) throws IOException {
+ long extra = (curPos + amt) - limitPos;
+ if (extra > 0) {
+ throw new IOException("Tried to skip " + extra + " bytes past " +
+ "the limit at offset " + limitPos);
+ }
+ long ret = super.skip(amt);
+ curPos += ret;
+ return ret;
+ }
}
public long getLastAppliedTxId() {
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Wed Jun 6 00:17:38 2012
@@ -75,6 +75,10 @@ import java.io.EOFException;
public abstract class FSEditLogOp {
public final FSEditLogOpCodes opCode;
long txid;
+ /**
+ * Opcode size is limited to 1.5 megabytes
+ */
+ public static final int MAX_OP_SIZE = (3 * 1024 * 1024) / 2;
@SuppressWarnings("deprecation")
@@ -2228,6 +2232,7 @@ public abstract class FSEditLogOp {
*/
public static class Reader {
private final DataInputStream in;
+ private final StreamLimiter limiter;
private final int logVersion;
private final Checksum checksum;
private final OpInstanceCache cache;
@@ -2238,7 +2243,7 @@ public abstract class FSEditLogOp {
* @param logVersion The version of the data coming from the stream.
*/
@SuppressWarnings("deprecation")
- public Reader(DataInputStream in, int logVersion) {
+ public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
this.logVersion = logVersion;
if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
this.checksum = new PureJavaCrc32();
@@ -2252,6 +2257,7 @@ public abstract class FSEditLogOp {
} else {
this.in = in;
}
+ this.limiter = limiter;
this.cache = new OpInstanceCache();
}
@@ -2263,30 +2269,76 @@ public abstract class FSEditLogOp {
*
* @param skipBrokenEdits If true, attempt to skip over damaged parts of
* the input stream, rather than throwing an IOException
- * @return the operation read from the stream, or null at the end of the file
- * @throws IOException on error.
+ * @return the operation read from the stream, or null at the end of the
+ * file
+ * @throws IOException on error. This function should only throw an
+ * exception when skipBrokenEdits is false.
*/
public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
- FSEditLogOp op = null;
while (true) {
try {
- in.mark(in.available());
- try {
- op = decodeOp();
- } finally {
- // If we encountered an exception or an end-of-file condition,
- // do not advance the input stream.
- if (op == null) {
- in.reset();
- }
+ limiter.setLimit(MAX_OP_SIZE);
+ in.mark(MAX_OP_SIZE);
+ return decodeOp();
+ } catch (GarbageAfterTerminatorException e) {
+ in.reset();
+ if (!skipBrokenEdits) {
+ throw e;
+ }
+ // If we saw a terminator opcode followed by a long region of 0x00 or
+ // 0xff, we want to skip over that region, because there's nothing
+ // interesting there.
+ long numSkip = e.getNumAfterTerminator();
+ if (in.skip(numSkip) < numSkip) {
+ FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " +
+ "garbage after an OP_INVALID. Unexpected early EOF.");
+ return null;
}
- return op;
} catch (IOException e) {
+ in.reset();
if (!skipBrokenEdits) {
throw e;
}
- if (in.skip(1) < 1) {
- return null;
+ } catch (RuntimeException e) {
+ // FSEditLogOp#decodeOp is not supposed to throw RuntimeException.
+ // However, we handle it here for recovery mode, just to be more
+ // robust.
+ in.reset();
+ if (!skipBrokenEdits) {
+ throw e;
+ }
+ } catch (Throwable e) {
+ in.reset();
+ if (!skipBrokenEdits) {
+ throw new IOException("got unexpected exception " +
+ e.getMessage(), e);
+ }
+ }
+ // Move ahead one byte and re-try the decode process.
+ if (in.skip(1) < 1) {
+ return null;
+ }
+ }
+ }
+
+ private void verifyTerminator() throws IOException {
+ long off = 0;
+ /** The end of the edit log should contain only 0x00 or 0xff bytes.
+ * If it contains other bytes, the log itself may be corrupt.
+ * It is important to check this; if we don't, a stray OP_INVALID byte
+ * could make us stop reading the edit log halfway through, and we'd never
+ * know that we had lost data.
+ */
+ byte[] buf = new byte[4096];
+ while (true) {
+ int numRead = in.read(buf);
+ if (numRead == -1) {
+ return;
+ }
+ for (int i = 0; i < numRead; i++, off++) {
+ if ((buf[i] != (byte)0) && (buf[i] != (byte)-1)) {
+ throw new GarbageAfterTerminatorException("Read garbage after " +
+ "the terminator!", off);
}
}
}
@@ -2306,8 +2358,10 @@ public abstract class FSEditLogOp {
}
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
- if (opCode == OP_INVALID)
+ if (opCode == OP_INVALID) {
+ verifyTerminator();
return null;
+ }
FSEditLogOp op = cache.get(opCode);
if (op == null) {
@@ -2477,4 +2531,35 @@ public abstract class FSEditLogOp {
short mode = Short.valueOf(st.getValue("MODE"));
return new PermissionStatus(username, groupname, new FsPermission(mode));
}
- }
+
+ /**
+ * Exception indicating that we found an OP_INVALID followed by some
+ * garbage. An OP_INVALID should signify the end of the file... if there
+ * is additional content after that, then the edit log is corrupt.
+ */
+ static class GarbageAfterTerminatorException extends IOException {
+ private static final long serialVersionUID = 1L;
+ private final long numAfterTerminator;
+
+ public GarbageAfterTerminatorException(String str,
+ long numAfterTerminator) {
+ super(str);
+ this.numAfterTerminator = numAfterTerminator;
+ }
+
+ /**
+ * Get the number of bytes after the terminator at which the garbage
+ * appeared.
+ *
+ * So if you had an OP_INVALID followed immediately by another valid opcode,
+ * this would be 0.
+ * If you had an OP_INVALID followed by some padding bytes, followed by a
+ * stray byte at the end, this would be the number of padding bytes.
+ *
+ * @return numAfterTerminator
+ */
+ public long getNumAfterTerminator() {
+ return numAfterTerminator;
+ }
+ }
+}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Jun 6 00:17:38 2012
@@ -54,12 +54,14 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -88,9 +90,6 @@ public class FSImage implements Closeabl
private final NNStorageRetentionManager archivalManager;
- private SaveNamespaceContext curSaveNamespaceContext = null;
-
-
/**
* Construct an FSImage
* @param conf Configuration
@@ -536,6 +535,11 @@ public class FSImage implements Closeabl
return editLog;
}
+ @VisibleForTesting
+ void setEditLogForTesting(FSEditLog newLog) {
+ editLog = newLog;
+ }
+
void openEditLogForWrite() throws IOException {
assert editLog != null : "editLog must be initialized";
editLog.openForWrite();
@@ -555,7 +559,7 @@ public class FSImage implements Closeabl
/**
* Choose latest image from one of the directories,
- * load it and merge with the edits from that directory.
+ * load it and merge with the edits.
*
* Saving and loading fsimage should never trigger symlink resolution.
* The paths that are persisted do not have *intermediate* symlinks
@@ -591,7 +595,7 @@ public class FSImage implements Closeabl
// OK to not be able to read all of edits right now.
long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0;
editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
- toAtLeastTxId, false);
+ toAtLeastTxId, recovery, false);
} else {
editStreams = FSImagePreTransactionalStorageInspector
.getEditLogStreams(storage);
@@ -599,7 +603,10 @@ public class FSImage implements Closeabl
LOG.debug("Planning to load image :\n" + imageFile);
for (EditLogInputStream l : editStreams) {
- LOG.debug("\t Planning to load edit stream: " + l);
+ LOG.debug("Planning to load edit log stream: " + l);
+ }
+ if (!editStreams.iterator().hasNext()) {
+ LOG.info("No edit log streams selected.");
}
try {
@@ -798,17 +805,28 @@ public class FSImage implements Closeabl
try {
thread.join();
} catch (InterruptedException iex) {
- LOG.error("Caught exception while waiting for thread " +
+ LOG.error("Caught interrupted exception while waiting for thread " +
thread.getName() + " to finish. Retrying join");
}
}
}
}
+
+ /**
+ * @see #saveNamespace(FSNamesystem, Canceler)
+ */
+ public synchronized void saveNamespace(FSNamesystem source)
+ throws IOException {
+ saveNamespace(source, null);
+ }
+
/**
* Save the contents of the FS image to a new image file in each of the
* current storage directories.
+ * @param canceler
*/
- public synchronized void saveNamespace(FSNamesystem source) throws IOException {
+ public synchronized void saveNamespace(FSNamesystem source,
+ Canceler canceler) throws IOException {
assert editLog != null : "editLog must be initialized";
storage.attemptRestoreRemovedStorage();
@@ -819,7 +837,7 @@ public class FSImage implements Closeabl
}
long imageTxId = getLastAppliedOrWrittenTxId();
try {
- saveFSImageInAllDirs(source, imageTxId);
+ saveFSImageInAllDirs(source, imageTxId, canceler);
storage.writeAll();
} finally {
if (editLogWasOpen) {
@@ -831,27 +849,27 @@ public class FSImage implements Closeabl
storage.writeTransactionIdFileToStorage(imageTxId + 1);
}
}
-
- }
-
- public void cancelSaveNamespace(String reason)
- throws InterruptedException {
- SaveNamespaceContext ctx = curSaveNamespaceContext;
- if (ctx != null) {
- ctx.cancel(reason); // waits until complete
- }
}
-
+ /**
+ * @see #saveFSImageInAllDirs(FSNamesystem, long, Canceler)
+ */
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
+ throws IOException {
+ saveFSImageInAllDirs(source, txid, null);
+ }
+
+ protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid,
+ Canceler canceler)
throws IOException {
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException("No image directories available!");
}
-
+ if (canceler == null) {
+ canceler = new Canceler();
+ }
SaveNamespaceContext ctx = new SaveNamespaceContext(
- source, txid);
- curSaveNamespaceContext = ctx;
+ source, txid, canceler);
try {
List<Thread> saveThreads = new ArrayList<Thread>();
@@ -872,7 +890,7 @@ public class FSImage implements Closeabl
throw new IOException(
"Failed to save in any storage directories while saving namespace.");
}
- if (ctx.isCancelled()) {
+ if (canceler.isCancelled()) {
deleteCancelledCheckpoint(txid);
ctx.checkCancelled(); // throws
assert false : "should have thrown above!";
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Wed Jun 6 00:17:38 2012
@@ -540,7 +540,6 @@ class FSImageFormat {
private void saveImage(ByteBuffer currentDirName,
INodeDirectory current,
DataOutputStream out) throws IOException {
- context.checkCancelled();
List<INode> children = current.getChildrenRaw();
if (children == null || children.isEmpty())
return;
@@ -554,9 +553,13 @@ class FSImageFormat {
out.write(currentDirName.array(), 0, prefixLen);
}
out.writeInt(children.size());
+ int i = 0;
for(INode child : children) {
// print all children first
FSImageSerialization.saveINode2Image(child, out);
+ if (i++ % 50 == 0) {
+ context.checkCancelled();
+ }
}
for(INode child : children) {
if(!child.isDirectory())