You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/10/19 04:28:07 UTC
svn commit: r1399950 [13/27] - in
/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: ./
hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apac...
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Oct 19 02:25:55 2012
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
import java.io.IOException;
import java.lang.reflect.Constructor;
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.security.t
import static org.apache.hadoop.util.ExitUtil.terminate;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
@@ -82,7 +83,7 @@ import com.google.common.collect.Lists;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class FSEditLog {
+public class FSEditLog implements LogsPurgeable {
static final Log LOG = LogFactory.getLog(FSEditLog.class);
@@ -173,6 +174,7 @@ public class FSEditLog {
// stores the most current transactionId of this thread.
private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() {
+ @Override
protected synchronized TransactionId initialValue() {
return new TransactionId(Long.MAX_VALUE);
}
@@ -332,6 +334,39 @@ public class FSEditLog {
state = State.CLOSED;
}
+
+ /**
+ * Format all configured journals which are not file-based.
+ *
+ * File-based journals are skipped, since they are formatted by the
+ * Storage format code.
+ */
+ synchronized void formatNonFileJournals(NamespaceInfo nsInfo) throws IOException {
+ Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
+ "Bad state: %s", state);
+
+ for (JournalManager jm : journalSet.getJournalManagers()) {
+ if (!(jm instanceof FileJournalManager)) {
+ jm.format(nsInfo);
+ }
+ }
+ }
+
+ synchronized List<FormatConfirmable> getFormatConfirmables() {
+ Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
+ "Bad state: %s", state);
+
+ List<FormatConfirmable> ret = Lists.newArrayList();
+ for (final JournalManager jm : journalSet.getJournalManagers()) {
+ // The FJMs are confirmed separately since they are also
+ // StorageDirectories
+ if (!(jm instanceof FileJournalManager)) {
+ ret.add(jm);
+ }
+ }
+ return ret;
+ }
+
/**
* Write an operation to the edit log. Do not sync to persistent
* store yet.
@@ -544,8 +579,9 @@ public class FSEditLog {
editLogStream.setReadyToFlush();
} catch (IOException e) {
final String msg =
- "Could not sync enough journals to persistent storage. "
- + "Unsynced transactions: " + (txid - synctxid);
+ "Could not sync enough journals to persistent storage " +
+ "due to " + e.getMessage() + ". " +
+ "Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception());
terminate(1, msg);
}
@@ -621,7 +657,7 @@ public class FSEditLog {
public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
AddOp op = AddOp.getInstance(cache.get())
.setPath(path)
- .setReplication(newNode.getReplication())
+ .setReplication(newNode.getBlockReplication())
.setModificationTime(newNode.getModificationTime())
.setAccessTime(newNode.getAccessTime())
.setBlockSize(newNode.getPreferredBlockSize())
@@ -639,7 +675,7 @@ public class FSEditLog {
public void logCloseFile(String path, INodeFile newNode) {
CloseOp op = CloseOp.getInstance(cache.get())
.setPath(path)
- .setReplication(newNode.getReplication())
+ .setReplication(newNode.getBlockReplication())
.setModificationTime(newNode.getModificationTime())
.setAccessTime(newNode.getAccessTime())
.setBlockSize(newNode.getPreferredBlockSize())
@@ -996,6 +1032,7 @@ public class FSEditLog {
/**
* Archive any log files that are older than the given txid.
*/
+ @Override
public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) {
assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op
minTxIdToKeep <= curSegmentTxId :
@@ -1134,8 +1171,14 @@ public class FSEditLog {
journalSet.recoverUnfinalizedSegments();
} catch (IOException ex) {
// All journals have failed, it is handled in logSync.
+ // TODO: are we sure this is OK?
}
}
+
+ public void selectInputStreams(Collection<EditLogInputStream> streams,
+ long fromTxId, boolean inProgressOk) {
+ journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
+ }
public Collection<EditLogInputStream> selectInputStreams(
long fromTxId, long toAtLeastTxId) throws IOException {
@@ -1153,7 +1196,7 @@ public class FSEditLog {
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
boolean inProgressOk) throws IOException {
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
- journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
+ selectInputStreams(streams, fromTxId, inProgressOk);
try {
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Oct 19 02:25:55 2012
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
import java.io.FilterInputStream;
import java.io.IOException;
@@ -29,12 +29,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.Storage;
@@ -193,8 +191,9 @@ public class FSEditLogLoader {
if (op.hasTransactionId()) {
long now = now();
if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
- int percent = Math.round((float)lastAppliedTxId / numTxns * 100);
- LOG.info("replaying edit log: " + lastAppliedTxId + "/" + numTxns
+ long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;
+ int percent = Math.round((float) deltaTxId / numTxns * 100);
+ LOG.info("replaying edit log: " + deltaTxId + "/" + numTxns
+ " transactions completed. (" + percent + "%)");
lastLogTime = now;
}
@@ -303,7 +302,9 @@ public class FSEditLogLoader {
addCloseOp.path);
}
- // Update in-memory data structures
+ // Update the salient file attributes.
+ oldFile.setAccessTime(addCloseOp.atime);
+ oldFile.setModificationTimeForce(addCloseOp.mtime);
updateBlocks(fsDir, addCloseOp, oldFile);
// Now close the file
@@ -591,13 +592,13 @@ public class FSEditLogLoader {
// what about an old-version fsync() where fsync isn't called
// until several blocks in?
newBI = new BlockInfoUnderConstruction(
- newBlock, file.getReplication());
+ newBlock, file.getBlockReplication());
} else {
// OP_CLOSE should add finalized blocks. This code path
// is only executed when loading edits written by prior
// versions of Hadoop. Current versions always log
// OP_ADD operations as each block is allocated.
- newBI = new BlockInfo(newBlock, file.getReplication());
+ newBI = new BlockInfo(newBlock, file.getBlockReplication());
}
fsNamesys.getBlockManager().addBlockCollection(newBI, file);
file.addBlock(newBI);
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Fri Oct 19 02:25:55 2012
@@ -33,7 +33,6 @@ import org.apache.hadoop.fs.Options.Rena
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
@@ -183,6 +182,7 @@ public abstract class FSEditLogOp {
return (T)this;
}
+ @Override
public String getPath() {
return path;
}
@@ -216,6 +216,7 @@ public abstract class FSEditLogOp {
return (T)this;
}
+ @Override
public Block[] getBlocks() {
return blocks;
}
@@ -409,6 +410,7 @@ public abstract class FSEditLogOp {
return (AddOp)cache.get(OP_ADD);
}
+ @Override
public boolean shouldCompleteLastBlock() {
return false;
}
@@ -431,6 +433,7 @@ public abstract class FSEditLogOp {
return (CloseOp)cache.get(OP_CLOSE);
}
+ @Override
public boolean shouldCompleteLastBlock() {
return true;
}
@@ -462,6 +465,7 @@ public abstract class FSEditLogOp {
return this;
}
+ @Override
public String getPath() {
return path;
}
@@ -471,6 +475,7 @@ public abstract class FSEditLogOp {
return this;
}
+ @Override
public Block[] getBlocks() {
return blocks;
}
@@ -2082,6 +2087,7 @@ public abstract class FSEditLogOp {
return (LogSegmentOp)cache.get(code);
}
+ @Override
public void readFields(DataInputStream in, int logVersion)
throws IOException {
// no data stored in these ops yet
@@ -2174,6 +2180,7 @@ public abstract class FSEditLogOp {
WritableFactories.setFactory
(BlockTwo.class,
new WritableFactory() {
+ @Override
public Writable newInstance() { return new BlockTwo(); }
});
}
@@ -2186,11 +2193,13 @@ public abstract class FSEditLogOp {
/////////////////////////////////////
// Writable
/////////////////////////////////////
+ @Override
public void write(DataOutput out) throws IOException {
out.writeLong(blkid);
out.writeLong(len);
}
+ @Override
public void readFields(DataInput in) throws IOException {
this.blkid = in.readLong();
this.len = in.readLong();
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Oct 19 02:25:55 2012
@@ -38,12 +38,15 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
import org.apache.hadoop.hdfs.server.common.Util;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
+
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@@ -57,6 +60,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
@@ -88,13 +92,11 @@ public class FSImage implements Closeabl
final private Configuration conf;
- private final NNStorageRetentionManager archivalManager;
+ protected NNStorageRetentionManager archivalManager;
/**
* Construct an FSImage
* @param conf Configuration
- * @see #FSImage(Configuration conf,
- * Collection imageDirs, Collection editsDirs)
* @throws IOException if default directories are invalid.
*/
public FSImage(Configuration conf) throws IOException {
@@ -138,11 +140,34 @@ public class FSImage implements Closeabl
fileCount + " files");
NamespaceInfo ns = NNStorage.newNamespaceInfo();
ns.clusterID = clusterId;
+
storage.format(ns);
+ editLog.formatNonFileJournals(ns);
saveFSImageInAllDirs(fsn, 0);
}
/**
+ * Check whether the storage directories and non-file journals exist.
+ * If running in interactive mode, will prompt the user for each
+ * directory to allow them to format anyway. Otherwise, returns
+ * false, unless 'force' is specified.
+ *
+ * @param force format regardless of whether dirs exist
+ * @param interactive prompt the user when a dir exists
+ * @return true if formatting should proceed
+ * @throws IOException if some storage cannot be accessed
+ */
+ boolean confirmFormat(boolean force, boolean interactive) throws IOException {
+ List<FormatConfirmable> confirms = Lists.newArrayList();
+ for (StorageDirectory sd : storage.dirIterable(null)) {
+ confirms.add(sd);
+ }
+
+ confirms.addAll(editLog.getFormatConfirmables());
+ return Storage.confirmFormat(confirms, force, interactive);
+ }
+
+ /**
* Analyze storage directories.
* Recover from previous transitions if required.
* Perform fs state transition if necessary depending on the namespace info.
@@ -165,8 +190,6 @@ public class FSImage implements Closeabl
throw new IOException(
"All specified directories are not accessible or do not exist.");
- storage.setUpgradeManager(target.upgradeManager);
-
// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
Map<StorageDirectory, StorageState> dataDirStates =
@@ -201,9 +224,6 @@ public class FSImage implements Closeabl
storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);
- // check whether distributed upgrade is required and/or should be continued
- storage.verifyDistributedUpgradeProgress(startOpt);
-
// 2. Format unformatted dirs.
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
@@ -294,13 +314,6 @@ public class FSImage implements Closeabl
}
private void doUpgrade(FSNamesystem target) throws IOException {
- if(storage.getDistributedUpgradeState()) {
- // only distributed upgrade need to continue
- // don't do version upgrade
- this.loadFSImage(target, null);
- storage.initializeDistributedUpgrade();
- return;
- }
// Upgrade is allowed only if there are
// no previous fs states in any of the directories
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
@@ -383,7 +396,6 @@ public class FSImage implements Closeabl
+ storage.getRemovedStorageDirs().size()
+ " storage directory(ies), previously logged.");
}
- storage.initializeDistributedUpgrade();
}
private void doRollback() throws IOException {
@@ -446,8 +458,6 @@ public class FSImage implements Closeabl
LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
}
isUpgradeFinalized = true;
- // check whether name-node can start in regular mode
- storage.verifyDistributedUpgradeProgress(StartupOption.REGULAR);
}
private void doFinalize(StorageDirectory sd) throws IOException {
@@ -546,8 +556,7 @@ public class FSImage implements Closeabl
* file.
*/
void reloadFromImageFile(File file, FSNamesystem target) throws IOException {
- target.dir.reset();
-
+ target.clear();
LOG.debug("Reloading namespace from " + file);
loadFSImage(file, target, null);
}
@@ -664,7 +673,7 @@ public class FSImage implements Closeabl
final long checkpointTxnCount = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
- long checkpointAge = System.currentTimeMillis() - imageFile.lastModified();
+ long checkpointAge = Time.now() - imageFile.lastModified();
return (checkpointAge > checkpointPeriod * 1000) ||
(numEditsLoaded > checkpointTxnCount);
@@ -761,7 +770,7 @@ public class FSImage implements Closeabl
saver.save(newFile, compression);
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
- storage.setMostRecentCheckpointInfo(txid, Util.now());
+ storage.setMostRecentCheckpointInfo(txid, Time.now());
}
/**
@@ -784,6 +793,7 @@ public class FSImage implements Closeabl
this.sd = sd;
}
+ @Override
public void run() {
try {
saveFSImage(context, sd);
@@ -797,6 +807,7 @@ public class FSImage implements Closeabl
}
}
+ @Override
public String toString() {
return "FSImageSaver for " + sd.getRoot() +
" of type " + sd.getStorageDirType();
@@ -1076,10 +1087,11 @@ public class FSImage implements Closeabl
// advertise it as such to other checkpointers
// from now on
if (txid > storage.getMostRecentCheckpointTxId()) {
- storage.setMostRecentCheckpointInfo(txid, Util.now());
+ storage.setMostRecentCheckpointInfo(txid, Time.now());
}
}
+ @Override
synchronized public void close() throws IOException {
if (editLog != null) { // 2NN doesn't have any edit log
getEditLog().close();
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Fri Oct 19 02:25:55 2012
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Fri Oct 19 02:25:55 2012
@@ -31,7 +31,6 @@ import org.apache.hadoop.hdfs.Deprecated
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ShortWritable;
@@ -127,7 +126,7 @@ public class FSImageSerialization {
String path)
throws IOException {
writeString(path, out);
- out.writeShort(cons.getReplication());
+ out.writeShort(cons.getBlockReplication());
out.writeLong(cons.getModificationTime());
out.writeLong(cons.getPreferredBlockSize());
int nrBlocks = cons.getBlocks().length;
@@ -176,7 +175,7 @@ public class FSImageSerialization {
filePerm);
} else {
INodeFile fileINode = (INodeFile)node;
- out.writeShort(fileINode.getReplication());
+ out.writeShort(fileINode.getBlockReplication());
out.writeLong(fileINode.getModificationTime());
out.writeLong(fileINode.getAccessTime());
out.writeLong(fileINode.getPreferredBlockSize());
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Fri Oct 19 02:25:55 2012
@@ -114,6 +114,7 @@ class FSImageTransactionalStorageInspect
*
* @throws FileNotFoundException if not images are found.
*/
+ @Override
FSImageFile getLatestImage() throws IOException {
if (foundImages.isEmpty()) {
throw new FileNotFoundException("No valid image files found");
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Oct 19 02:25:55 2012
@@ -19,12 +19,18 @@ package org.apache.hadoop.hdfs.server.na
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
@@ -62,7 +68,7 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
@@ -129,7 +135,6 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -153,7 +158,6 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
@@ -165,6 +169,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -172,7 +177,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
@@ -190,6 +194,8 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
@@ -232,6 +238,7 @@ public class FSNamesystem implements Nam
private static final ThreadLocal<StringBuilder> auditBuffer =
new ThreadLocal<StringBuilder>() {
+ @Override
protected StringBuilder initialValue() {
return new StringBuilder();
}
@@ -292,7 +299,7 @@ public class FSNamesystem implements Nam
// Scan interval is not configurable.
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
- private final DelegationTokenSecretManager dtSecretManager;
+ final DelegationTokenSecretManager dtSecretManager;
private final boolean alwaysUseDelegationTokensForTests;
@@ -360,6 +367,23 @@ public class FSNamesystem implements Nam
private final boolean haEnabled;
/**
+ * Clear all loaded data
+ */
+ void clear() {
+ dir.reset();
+ dtSecretManager.reset();
+ generationStamp.setStamp(GenerationStamp.FIRST_VALID_STAMP);
+ leaseManager.removeAllLeases();
+ }
+
+ @VisibleForTesting
+ LeaseManager getLeaseManager() {
+ return leaseManager;
+ }
+
+ /**
+
+ /**
* Instantiates an FSNamesystem loaded from the image and edits
* directories specified in the passed Configuration.
*
@@ -392,12 +416,14 @@ public class FSNamesystem implements Nam
throws IOException {
if (namespaceDirs.size() == 1) {
- LOG.warn("Only one " + DFS_NAMENODE_NAME_DIR_KEY
- + " directory configured , beware data loss!");
+ LOG.warn("Only one image storage directory ("
+ + DFS_NAMENODE_NAME_DIR_KEY + ") configured. Beware of dataloss"
+ + " due to lack of redundant storage directories!");
}
if (namespaceEditsDirs.size() == 1) {
- LOG.warn("Only one " + DFS_NAMENODE_EDITS_DIR_KEY
- + " directory configured , beware data loss!");
+ LOG.warn("Only one namespace edits storage directory ("
+ + DFS_NAMENODE_EDITS_DIR_KEY + ") configured. Beware of dataloss"
+ + " due to lack of redundant storage directories!");
}
FSImage fsImage = new FSImage(conf, namespaceDirs, namespaceEditsDirs);
@@ -467,12 +493,25 @@ public class FSNamesystem implements Nam
"must not be specified if HA is not enabled.");
}
+ // Get the checksum type from config
+ String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY, DFS_CHECKSUM_TYPE_DEFAULT);
+ DataChecksum.Type checksumType;
+ try {
+ checksumType = DataChecksum.Type.valueOf(checksumTypeStr);
+ } catch (IllegalArgumentException iae) {
+ throw new IOException("Invalid checksum type in "
+ + DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr);
+ }
+
this.serverDefaults = new FsServerDefaults(
conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
- conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
+ conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
+ conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
+ conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
+ checksumType);
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
@@ -619,6 +658,7 @@ public class FSNamesystem implements Nam
LOG.info("Catching up to latest edits from old active before " +
"taking over writer role in edits logs.");
editLogTailer.catchupDuringFailover();
+ blockManager.setPostponeBlocksFromFuture(false);
LOG.info("Reprocessing replication and invalidation queues...");
blockManager.getDatanodeManager().markAllDatanodesStale();
@@ -702,6 +742,9 @@ public class FSNamesystem implements Nam
// During startup, we're already open for read.
dir.fsImage.editLog.initSharedJournalsForRead();
}
+
+ blockManager.setPostponeBlocksFromFuture(true);
+
editLogTailer = new EditLogTailer(this, conf);
editLogTailer.start();
if (standbyShouldCheckpoint) {
@@ -913,8 +956,7 @@ public class FSNamesystem implements Nam
NamespaceInfo unprotectedGetNamespaceInfo() {
return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
getClusterId(), getBlockPoolId(),
- dir.fsImage.getStorage().getCTime(),
- upgradeManager.getUpgradeVersion());
+ dir.fsImage.getStorage().getCTime());
}
/**
@@ -1030,7 +1072,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"setPermission", src, null, null);
}
throw e;
@@ -1059,7 +1101,7 @@ public class FSNamesystem implements Nam
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"setPermission", src, null, resultingStat);
}
}
@@ -1076,7 +1118,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"setOwner", src, null, null);
}
throw e;
@@ -1114,7 +1156,7 @@ public class FSNamesystem implements Nam
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"setOwner", src, null, resultingStat);
}
}
@@ -1131,6 +1173,14 @@ public class FSNamesystem implements Nam
if (blocks != null) {
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, blocks.getLocatedBlocks());
+
+ LocatedBlock lastBlock = blocks.getLastLocatedBlock();
+ if (lastBlock != null) {
+ ArrayList<LocatedBlock> lastBlockList = new ArrayList<LocatedBlock>();
+ lastBlockList.add(lastBlock);
+ blockManager.getDatanodeManager().sortLocatedBlocks(
+ clientMachine, lastBlockList);
+ }
}
return blocks;
}
@@ -1149,7 +1199,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"open", src, null, null);
}
throw e;
@@ -1175,7 +1225,7 @@ public class FSNamesystem implements Nam
offset, length, doAccessTime, needBlockToken);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"open", src, null, null);
}
if (checkSafeMode && isInSafeMode()) {
@@ -1260,7 +1310,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getLoginUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"concat", Arrays.toString(srcs), target, null);
}
throw e;
@@ -1310,7 +1360,7 @@ public class FSNamesystem implements Nam
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getLoginUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"concat", Arrays.toString(srcs), target, resultingStat);
}
}
@@ -1361,7 +1411,7 @@ public class FSNamesystem implements Nam
}
si.add(trgInode);
- short repl = trgInode.getReplication();
+ short repl = trgInode.getBlockReplication();
// now check the srcs
boolean endSrc = false; // final src file doesn't have to have full end block
@@ -1381,10 +1431,10 @@ public class FSNamesystem implements Nam
}
// check replication and blocks size
- if(repl != srcInode.getReplication()) {
+ if(repl != srcInode.getBlockReplication()) {
throw new IllegalArgumentException(src + " and " + target + " " +
"should have same replication: "
- + repl + " vs. " + srcInode.getReplication());
+ + repl + " vs. " + srcInode.getBlockReplication());
}
//boolean endBlock=false;
@@ -1427,7 +1477,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"setTimes", src, null, null);
}
throw e;
@@ -1454,7 +1504,7 @@ public class FSNamesystem implements Nam
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(src, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"setTimes", src, null, stat);
}
} else {
@@ -1476,7 +1526,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"createSymlink", link, target, null);
}
throw e;
@@ -1504,7 +1554,7 @@ public class FSNamesystem implements Nam
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"createSymlink", link, target, resultingStat);
}
}
@@ -1560,7 +1610,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"setReplication", src, null, null);
}
throw e;
@@ -1596,7 +1646,7 @@ public class FSNamesystem implements Nam
getEditLog().logSync();
if (isFile && auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"setReplication", src, null, null);
}
return isFile;
@@ -1653,7 +1703,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"create", src, null, null);
}
throw e;
@@ -1678,7 +1728,7 @@ public class FSNamesystem implements Nam
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(src, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"create", src, null, stat);
}
}
@@ -1744,8 +1794,6 @@ public class FSNamesystem implements Nam
try {
INodeFile myFile = dir.getFileINode(src);
- recoverLeaseInternal(myFile, src, holder, clientMachine, false);
-
try {
blockManager.verifyReplication(src, replication, clientMachine);
} catch(IOException e) {
@@ -1761,10 +1809,15 @@ public class FSNamesystem implements Nam
// File exists - must be one of append or overwrite
if (overwrite) {
delete(src, true);
- } else if (!append) {
- throw new FileAlreadyExistsException("failed to create file " + src
- + " on client " + clientMachine
- + " because the file exists");
+ } else {
+ // Opening an existing file for write - may need to recover lease.
+ recoverLeaseInternal(myFile, src, holder, clientMachine, false);
+
+ if (!append) {
+ throw new FileAlreadyExistsException("failed to create file " + src
+ + " on client " + clientMachine
+ + " because the file exists");
+ }
}
}
@@ -1824,7 +1877,7 @@ public class FSNamesystem implements Nam
boolean writeToEditLog) throws IOException {
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
file.getLocalNameBytes(),
- file.getReplication(),
+ file.getBlockReplication(),
file.getModificationTime(),
file.getPreferredBlockSize(),
file.getBlocks(),
@@ -1973,7 +2026,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"append", src, null, null);
}
throw e;
@@ -2011,7 +2064,7 @@ public class FSNamesystem implements Nam
}
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"append", src, null, null);
}
return lb;
@@ -2023,6 +2076,7 @@ public class FSNamesystem implements Nam
void setBlockPoolId(String bpid) {
blockPoolId = bpid;
+ blockManager.setBlockPoolId(blockPoolId);
}
/**
@@ -2137,7 +2191,7 @@ public class FSNamesystem implements Nam
fileLength = pendingFile.computeContentSummary().getLength();
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
- replication = pendingFile.getReplication();
+ replication = pendingFile.getBlockReplication();
} finally {
writeUnlock();
}
@@ -2381,7 +2435,7 @@ public class FSNamesystem implements Nam
* them into invalidateBlocks.
*/
private void checkReplicationFactor(INodeFile file) {
- short numExpectedReplicas = file.getReplication();
+ short numExpectedReplicas = file.getBlockReplication();
Block[] pendingBlocks = file.getBlocks();
int nrBlocks = pendingBlocks.length;
for (int i = 0; i < nrBlocks; i++) {
@@ -2476,7 +2530,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"rename", src, dst, null);
}
throw e;
@@ -2505,7 +2559,7 @@ public class FSNamesystem implements Nam
getEditLog().logSync();
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"rename", src, dst, resultingStat);
}
return status;
@@ -2565,7 +2619,7 @@ public class FSNamesystem implements Nam
for (Rename option : options) {
cmd.append(option.value()).append(" ");
}
- logAuditEvent(UserGroupInformation.getCurrentUser(), Server.getRemoteIp(),
+ logAuditEvent(UserGroupInformation.getCurrentUser(), getRemoteIp(),
cmd.toString(), src, dst, resultingStat);
}
}
@@ -2603,7 +2657,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"delete", src, null, null);
}
throw e;
@@ -2619,7 +2673,7 @@ public class FSNamesystem implements Nam
boolean status = deleteInternal(src, recursive, true);
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"delete", src, null, null);
}
return status;
@@ -2757,8 +2811,11 @@ public class FSNamesystem implements Nam
*/
HdfsFileStatus getFileInfo(String src, boolean resolveLink)
throws AccessControlException, UnresolvedLinkException,
- StandbyException {
+ StandbyException, IOException {
+ HdfsFileStatus stat = null;
+
readLock();
+
try {
checkOperation(OperationCategory.READ);
@@ -2768,10 +2825,23 @@ public class FSNamesystem implements Nam
if (isPermissionEnabled) {
checkTraverse(src);
}
- return dir.getFileInfo(src, resolveLink);
+ stat = dir.getFileInfo(src, resolveLink);
+ } catch (AccessControlException e) {
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+ getRemoteIp(),
+ "getfileinfo", src, null, null);
+ }
+ throw e;
} finally {
readUnlock();
}
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ logAuditEvent(UserGroupInformation.getCurrentUser(),
+ getRemoteIp(),
+ "getfileinfo", src, null, null);
+ }
+ return stat;
}
/**
@@ -2784,7 +2854,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"mkdirs", src, null, null);
}
throw e;
@@ -2809,7 +2879,7 @@ public class FSNamesystem implements Nam
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(src, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"mkdirs", src, null, stat);
}
return status;
@@ -3084,7 +3154,7 @@ public class FSNamesystem implements Nam
if (diff > 0) {
try {
String path = leaseManager.findPath(fileINode);
- dir.updateSpaceConsumed(path, 0, -diff * fileINode.getReplication());
+ dir.updateSpaceConsumed(path, 0, -diff * fileINode.getBlockReplication());
} catch (IOException e) {
LOG.warn("Unexpected exception while updating disk space.", e);
}
@@ -3250,7 +3320,7 @@ public class FSNamesystem implements Nam
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"listStatus", src, null, null);
}
throw e;
@@ -3274,7 +3344,7 @@ public class FSNamesystem implements Nam
}
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
+ getRemoteIp(),
"listStatus", src, null, null);
}
dl = dir.getListing(src, startAfter, needLocation);
@@ -3354,13 +3424,6 @@ public class FSNamesystem implements Nam
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, maxTransfer, failedVolumes);
- if (cmds == null || cmds.length == 0) {
- DatanodeCommand cmd = upgradeManager.getBroadcastCommand();
- if (cmd != null) {
- cmds = new DatanodeCommand[] {cmd};
- }
- }
-
return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
} finally {
readUnlock();
@@ -3727,6 +3790,11 @@ public class FSNamesystem implements Nam
this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
this.safeReplication = conf.getInt(DFS_NAMENODE_REPLICATION_MIN_KEY,
DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
+
+ LOG.info(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY + " = " + threshold);
+ LOG.info(DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY + " = " + datanodeThreshold);
+ LOG.info(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + " = " + extension);
+
// default to safe mode threshold (i.e., don't populate queues before leaving safe mode)
this.replQueueThreshold =
conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
@@ -3796,24 +3864,9 @@ public class FSNamesystem implements Nam
/**
* Leave safe mode.
* <p>
- * Switch to manual safe mode if distributed upgrade is required.<br>
* Check for invalid, under- & over-replicated blocks in the end of startup.
*/
- private synchronized void leave(boolean checkForUpgrades) {
- if(checkForUpgrades) {
- // verify whether a distributed upgrade needs to be started
- boolean needUpgrade = false;
- try {
- needUpgrade = upgradeManager.startUpgrade();
- } catch(IOException e) {
- FSNamesystem.LOG.error("IOException in startDistributedUpgradeIfNeeded", e);
- }
- if(needUpgrade) {
- // switch to manual safe mode
- safeMode = new SafeModeInfo(false);
- return;
- }
- }
+ private synchronized void leave() {
// if not done yet, initialize replication queues.
// In the standby, do not populate repl queues
if (!isPopulatingReplQueues() && !isInStandbyState()) {
@@ -3907,7 +3960,7 @@ public class FSNamesystem implements Nam
// the threshold is reached
if (!isOn() || // safe mode is off
extension <= 0 || threshold <= 0) { // don't need to wait
- this.leave(true); // leave safe mode
+ this.leave(); // leave safe mode
return;
}
if (reached > 0) { // threshold has already been reached before
@@ -4006,15 +4059,14 @@ public class FSNamesystem implements Nam
return "Safe mode is OFF.";
String leaveMsg = "";
if (areResourcesLow()) {
- leaveMsg = "Resources are low on NN. Safe mode must be turned off manually";
+ leaveMsg = "Resources are low on NN. "
+ + "Please add or free up more resources then turn off safe mode manually. "
+ + "NOTE: If you turn off safe mode before adding resources, "
+ + "the NN will immediately return to safe mode.";
} else {
leaveMsg = "Safe mode will be turned off automatically";
}
if(isManual()) {
- if(upgradeManager.getUpgradeState())
- return leaveMsg + " upon completion of " +
- "the distributed upgrade: upgrade progress = " +
- upgradeManager.getUpgradeStatus() + "%";
leaveMsg = "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off";
}
@@ -4137,6 +4189,7 @@ public class FSNamesystem implements Nam
/**
*/
+ @Override
public void run() {
while (fsRunning && (safeMode != null && !safeMode.canLeave())) {
try {
@@ -4148,13 +4201,7 @@ public class FSNamesystem implements Nam
LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread. ");
} else {
// leave safe mode and stop the monitor
- try {
- leaveSafeMode(true);
- } catch(SafeModeException es) { // should never happen
- String msg = "SafeModeMonitor may not run during distributed upgrade.";
- assert false : msg;
- throw new RuntimeException(msg, es);
- }
+ leaveSafeMode();
}
smmthread = null;
}
@@ -4165,7 +4212,7 @@ public class FSNamesystem implements Nam
checkSuperuserPrivilege();
switch(action) {
case SAFEMODE_LEAVE: // leave safe mode
- leaveSafeMode(false);
+ leaveSafeMode();
break;
case SAFEMODE_ENTER: // enter safe mode
enterSafeMode(false);
@@ -4241,6 +4288,7 @@ public class FSNamesystem implements Nam
* @param deltaSafe the change in number of safe blocks
* @param deltaTotal the change i nnumber of total blocks expected
*/
+ @Override
public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal) {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
@@ -4349,17 +4397,14 @@ public class FSNamesystem implements Nam
* Leave safe mode.
* @throws IOException
*/
- void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
+ void leaveSafeMode() {
writeLock();
try {
if (!isInSafeMode()) {
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
return;
}
- if(upgradeManager.getUpgradeState())
- throw new SafeModeException("Distributed upgrade is in progress",
- safeMode);
- safeMode.leave(checkForUpgrades);
+ safeMode.leave();
} finally {
writeUnlock();
}
@@ -4381,6 +4426,7 @@ public class FSNamesystem implements Nam
writeLock();
try {
checkOperation(OperationCategory.JOURNAL);
+ checkSuperuserPrivilege();
if (isInSafeMode()) {
throw new SafeModeException("Log not rolled", safeMode);
}
@@ -4434,18 +4480,6 @@ public class FSNamesystem implements Nam
return (blockManager.getBlockCollection(b) != null);
}
- // Distributed upgrade manager
- final UpgradeManagerNamenode upgradeManager = new UpgradeManagerNamenode(this);
-
- UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
- ) throws IOException {
- return upgradeManager.distributedUpgradeProgress(action);
- }
-
- UpgradeCommand processDistributedUpgradeCommand(UpgradeCommand comm) throws IOException {
- return upgradeManager.processUpgradeCommand(comm);
- }
-
PermissionStatus createFsOwnerPermissions(FsPermission permission) {
return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
}
@@ -4643,6 +4677,13 @@ public class FSNamesystem implements Nam
public int getNumDeadDataNodes() {
return getBlockManager().getDatanodeManager().getNumDeadDataNodes();
}
+
+ @Override // FSNamesystemMBean
+ @Metric({"StaleDataNodes",
+ "Number of datanodes marked stale due to delayed heartbeat"})
+ public int getNumStaleDataNodes() {
+ return getBlockManager().getDatanodeManager().getNumStaleNodes();
+ }
/**
* Sets the generation stamp for this filesystem
@@ -4963,6 +5004,7 @@ public class FSNamesystem implements Nam
block = b;
}
+ @Override
public String toString() {
return block.getBlockName() + "\t" + path;
}
@@ -5244,7 +5286,15 @@ public class FSNamesystem implements Nam
* RPC call context even if the client exits.
*/
private boolean isExternalInvocation() {
- return Server.isRpcInvocation();
+ return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();
+ }
+
+ private static InetAddress getRemoteIp() {
+ InetAddress ip = Server.getRemoteIp();
+ if (ip != null) {
+ return ip;
+ }
+ return NamenodeWebHdfsMethods.getRemoteIp();
}
/**
@@ -5411,7 +5461,7 @@ public class FSNamesystem implements Nam
}
private long getLastContact(DatanodeDescriptor alivenode) {
- return (System.currentTimeMillis() - alivenode.getLastUpdate())/1000;
+ return (Time.now() - alivenode.getLastUpdate())/1000;
}
private long getDfsUsed(DatanodeDescriptor alivenode) {
@@ -5468,6 +5518,7 @@ public class FSNamesystem implements Nam
getDelegationTokenSecretManager().verifyToken(identifier, password);
}
+ @Override
public boolean isGenStampInFuture(long genStamp) {
return (genStamp > getGenerationStamp());
}
@@ -5495,4 +5546,10 @@ public class FSNamesystem implements Nam
public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
this.nnResourceChecker = nnResourceChecker;
}
+
+ @Override
+ public boolean isAvoidingStaleDataNodesForWrite() {
+ return this.blockManager.getDatanodeManager()
+ .isAvoidingStaleDataNodesForWrite();
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Fri Oct 19 02:25:55 2012
@@ -71,7 +71,7 @@ public class FileChecksumServlets {
String tokenString = ugi.getTokens().iterator().next().encodeToUrlString();
dtParam = JspHelper.getDelegationTokenUrlParam(tokenString);
}
- String addr = NetUtils.getHostPortString(nn.getNameNodeAddress());
+ String addr = nn.getNameNodeAddressHostPortString();
String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
return new URL(scheme, hostname, port,
@@ -127,7 +127,7 @@ public class FileChecksumServlets {
datanode, conf, getUGI(request, conf));
final ClientProtocol nnproxy = dfs.getNamenode();
final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
- path, nnproxy, socketFactory, socketTimeout);
+ path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey(), false);
MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) {
writeXml(ioe, path, xml);
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Fri Oct 19 02:25:55 2012
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.common.JspHelper;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ServletUtil;
@@ -74,7 +73,7 @@ public class FileDataServlet extends Dfs
// Add namenode address to the url params
NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
getServletContext());
- String addr = NetUtils.getHostPortString(nn.getNameNodeAddress());
+ String addr = nn.getNameNodeAddressHostPortString();
String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
return new URL(scheme, hostname, port,
@@ -106,6 +105,7 @@ public class FileDataServlet extends Dfs
* GET http://<nn>:<port>/data[/<path>] HTTP/1.1
* }
*/
+ @Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response)
throws IOException {
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Oct 19 02:25:55 2012
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -36,9 +37,11 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.ComparisonChain;
@@ -50,7 +53,8 @@ import com.google.common.collect.Compari
* Note: this class is not thread-safe and should be externally
* synchronized.
*/
-class FileJournalManager implements JournalManager {
+@InterfaceAudience.Private
+public class FileJournalManager implements JournalManager {
private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
private final StorageDirectory sd;
@@ -78,6 +82,22 @@ class FileJournalManager implements Jour
public void close() throws IOException {}
@Override
+ public void format(NamespaceInfo ns) throws IOException {
+ // Formatting file journals is done by the StorageDirectory
+ // format code, since they may share their directory with
+ // checkpoints, etc.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasSomeData() {
+ // Formatting file journals is done by the StorageDirectory
+ // format code, since they may share their directory with
+ // checkpoints, etc.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
synchronized public EditLogOutputStream startLogSegment(long txid)
throws IOException {
try {
@@ -147,7 +167,7 @@ class FileJournalManager implements Jour
* @return a list of remote edit logs
* @throws IOException if edit logs cannot be listed.
*/
- List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
+ public List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
@@ -165,6 +185,8 @@ class FileJournalManager implements Jour
}
}
+ Collections.sort(ret);
+
return ret;
}
@@ -178,7 +200,7 @@ class FileJournalManager implements Jour
* @throws IOException
* IOException thrown for invalid logDir
*/
- static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
+ public static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
return matchEditLogs(FileUtil.listFiles(logDir));
}
@@ -206,7 +228,7 @@ class FileJournalManager implements Jour
try {
long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
ret.add(
- new EditLogFile(f, startTxId, startTxId, true));
+ new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true));
} catch (NumberFormatException nfe) {
LOG.error("In-progress edits file " + f + " has improperly " +
"formatted transaction ID");
@@ -220,18 +242,16 @@ class FileJournalManager implements Jour
@Override
synchronized public void selectInputStreams(
Collection<EditLogInputStream> streams, long fromTxId,
- boolean inProgressOk) {
- List<EditLogFile> elfs;
- try {
- elfs = matchEditLogs(sd.getCurrentDir());
- } catch (IOException e) {
- LOG.error("error listing files in " + this + ". " +
- "Skipping all edit logs in this directory.", e);
- return;
- }
+ boolean inProgressOk) throws IOException {
+ List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
"from among " + elfs.size() + " candidate file(s)");
+ addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk);
+ }
+
+ static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
+ Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) {
for (EditLogFile elf : elfs) {
if (elf.isInProgress()) {
if (!inProgressOk) {
@@ -304,7 +324,7 @@ class FileJournalManager implements Jour
}
}
- List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
+ public List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
List<EditLogFile> logFiles = Lists.newArrayList();
@@ -320,6 +340,32 @@ class FileJournalManager implements Jour
return logFiles;
}
+
+ public EditLogFile getLogFile(long startTxId) throws IOException {
+ return getLogFile(sd.getCurrentDir(), startTxId);
+ }
+
+ public static EditLogFile getLogFile(File dir, long startTxId)
+ throws IOException {
+ List<EditLogFile> files = matchEditLogs(dir);
+ List<EditLogFile> ret = Lists.newLinkedList();
+ for (EditLogFile elf : files) {
+ if (elf.getFirstTxId() == startTxId) {
+ ret.add(elf);
+ }
+ }
+
+ if (ret.isEmpty()) {
+ // no matches
+ return null;
+ } else if (ret.size() == 1) {
+ return ret.get(0);
+ } else {
+ throw new IllegalStateException("More than one log segment in " +
+ dir + " starting at txid " + startTxId + ": " +
+ Joiner.on(", ").join(ret));
+ }
+ }
@Override
public String toString() {
@@ -329,7 +375,8 @@ class FileJournalManager implements Jour
/**
* Record of an edit log that has been located and had its filename parsed.
*/
- static class EditLogFile {
+ @InterfaceAudience.Private
+ public static class EditLogFile {
private File file;
private final long firstTxId;
private long lastTxId;
@@ -339,6 +386,7 @@ class FileJournalManager implements Jour
final static Comparator<EditLogFile> COMPARE_BY_START_TXID
= new Comparator<EditLogFile>() {
+ @Override
public int compare(EditLogFile a, EditLogFile b) {
return ComparisonChain.start()
.compare(a.getFirstTxId(), b.getFirstTxId())
@@ -361,17 +409,20 @@ class FileJournalManager implements Jour
assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID);
assert file != null;
+ Preconditions.checkArgument(!isInProgress ||
+ lastTxId == HdfsConstants.INVALID_TXID);
+
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.file = file;
this.isInProgress = isInProgress;
}
- long getFirstTxId() {
+ public long getFirstTxId() {
return firstTxId;
}
- long getLastTxId() {
+ public long getLastTxId() {
return lastTxId;
}
@@ -384,17 +435,17 @@ class FileJournalManager implements Jour
* This will update the lastTxId of the EditLogFile or
* mark it as corrupt if it is.
*/
- void validateLog() throws IOException {
+ public void validateLog() throws IOException {
EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
this.lastTxId = val.getEndTxId();
this.hasCorruptHeader = val.hasCorruptHeader();
}
- boolean isInProgress() {
+ public boolean isInProgress() {
return isInProgress;
}
- File getFile() {
+ public File getFile() {
return file;
}
@@ -407,7 +458,7 @@ class FileJournalManager implements Jour
renameSelf(".corrupt");
}
- void moveAsideEmptyFile() throws IOException {
+ public void moveAsideEmptyFile() throws IOException {
assert lastTxId == HdfsConstants.INVALID_TXID;
renameSelf(".empty");
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java Fri Oct 19 02:25:55 2012
@@ -42,6 +42,7 @@ public class FsckServlet extends DfsServ
private static final long serialVersionUID = 1L;
/** Handle fsck request */
+ @Override
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws IOException {
@SuppressWarnings("unchecked")
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java Fri Oct 19 02:25:55 2012
@@ -79,6 +79,7 @@ public class GetImageServlet extends Htt
private static Set<Long> currentlyDownloadingCheckpoints =
Collections.<Long>synchronizedSet(new HashSet<Long>());
+ @Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response
) throws ServletException, IOException {
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Fri Oct 19 02:25:55 2012
@@ -96,13 +96,6 @@ abstract class INode implements Comparab
}
}
- protected INode() {
- name = null;
- parent = null;
- modificationTime = 0;
- accessTime = 0;
- }
-
INode(PermissionStatus permissions, long mTime, long atime) {
this.name = null;
this.parent = null;
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Fri Oct 19 02:25:55 2012
@@ -65,6 +65,7 @@ class INodeDirectory extends INode {
/**
* Check whether it's a directory
*/
+ @Override
public boolean isDirectory() {
return true;
}
@@ -422,6 +423,7 @@ class INodeDirectory extends INode {
return children;
}
+ @Override
int collectSubtreeBlocksAndClear(List<Block> v) {
int total = 1;
if (children == null) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Fri Oct 19 02:25:55 2012
@@ -71,6 +71,7 @@ class INodeDirectoryWithQuota extends IN
/** Get this directory's namespace quota
* @return this directory's namespace quota
*/
+ @Override
long getNsQuota() {
return nsQuota;
}
@@ -78,6 +79,7 @@ class INodeDirectoryWithQuota extends IN
/** Get this directory's diskspace quota
* @return this directory's diskspace quota
*/
+ @Override
long getDsQuota() {
return dsQuota;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Fri Oct 19 02:25:55 2012
@@ -59,17 +59,19 @@ public class INodeFile extends INode imp
* Since this is a file,
* the {@link FsAction#EXECUTE} action, if any, is ignored.
*/
+ @Override
void setPermission(FsPermission permission) {
super.setPermission(permission.applyUMask(UMASK));
}
+ @Override
boolean isDirectory() {
return false;
}
/** @return the replication factor of the file. */
@Override
- public short getReplication() {
+ public short getBlockReplication() {
return (short) ((header & HEADERMASK) >> BLOCKBITS);
}
@@ -138,6 +140,7 @@ public class INodeFile extends INode imp
this.blocks[idx] = blk;
}
+ @Override
int collectSubtreeBlocksAndClear(List<Block> v) {
parent = null;
if(blocks != null && v != null) {
@@ -212,7 +215,7 @@ public class INodeFile extends INode imp
isUnderConstruction()) {
size += getPreferredBlockSize() - blkArr[blkArr.length-1].getNumBytes();
}
- return size * getReplication();
+ return size * getBlockReplication();
}
/**
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Fri Oct 19 02:25:55 2012
@@ -104,7 +104,7 @@ class INodeFileUnderConstruction extends
"non-complete blocks! Blocks are: " + blocksAsString();
INodeFile obj = new INodeFile(getPermissionStatus(),
getBlocks(),
- getReplication(),
+ getBlockReplication(),
getModificationTime(),
getModificationTime(),
getPreferredBlockSize());
@@ -147,6 +147,7 @@ class INodeFileUnderConstruction extends
* Convert the last block of the file to an under-construction block.
* Set its locations.
*/
+ @Override
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
DatanodeDescriptor[] targets)
throws IOException {
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java Fri Oct 19 02:25:55 2012
@@ -40,6 +40,7 @@ public class INodeSymlink extends INode
setAccessTime(atime);
}
+ @Override
public boolean isLink() {
return true;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Fri Oct 19 02:25:55 2012
@@ -19,10 +19,11 @@ package org.apache.hadoop.hdfs.server.na
import java.io.Closeable;
import java.io.IOException;
-import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
/**
* A JournalManager is responsible for managing a single place of storing
@@ -33,7 +34,15 @@ import org.apache.hadoop.classification.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public interface JournalManager extends Closeable {
+public interface JournalManager extends Closeable, FormatConfirmable,
+ LogsPurgeable {
+
+ /**
+ * Format the underlying storage, removing any previously
+ * stored data.
+ */
+ void format(NamespaceInfo ns) throws IOException;
+
/**
* Begin writing to a new segment of the log stream, which starts at
* the given transaction ID.
@@ -46,36 +55,12 @@ public interface JournalManager extends
*/
void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
- /**
- * Get a list of edit log input streams. The list will start with the
- * stream that contains fromTxnId, and continue until the end of the journal
- * being managed.
- *
- * @param fromTxnId the first transaction id we want to read
- * @param inProgressOk whether or not in-progress streams should be returned
- *
- * @return a list of streams
- */
- void selectInputStreams(Collection<EditLogInputStream> streams,
- long fromTxnId, boolean inProgressOk);
-
/**
* Set the amount of memory that this stream should use to buffer edits
*/
void setOutputBufferCapacity(int size);
/**
- * The JournalManager may archive/purge any logs for transactions less than
- * or equal to minImageTxId.
- *
- * @param minTxIdToKeep the earliest txid that must be retained after purging
- * old logs
- * @throws IOException if purging fails
- */
- void purgeLogsOlderThan(long minTxIdToKeep)
- throws IOException;
-
- /**
* Recover segments which have not been finalized.
*/
void recoverUnfinalizedSegments() throws IOException;
@@ -83,6 +68,7 @@ public interface JournalManager extends
/**
* Close the journal manager, freeing any resources it may hold.
*/
+ @Override
void close() throws IOException;
/**