You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/31 21:31:58 UTC
svn commit: r543225 - in /lucene/hadoop/branches/branch-0.13: ./
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/
src/test/org/apache/hadoop/dfs/
Author: cutting
Date: Thu May 31 12:31:57 2007
New Revision: 543225
URL: http://svn.apache.org/viewvc?view=rev&rev=543225
Log:
Merge -r 543206:543222 from trunk to 0.13 branch. Fixes: HADOOP-1242 and HADOOP-1332.
Modified:
lucene/hadoop/branches/branch-0.13/CHANGES.txt
lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java
lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java
lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java
lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java
lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java
lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java
Modified: lucene/hadoop/branches/branch-0.13/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/CHANGES.txt?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.13/CHANGES.txt Thu May 31 12:31:57 2007
@@ -430,6 +430,12 @@
AlreadyBeingCreatedException when wrapped as a RemoteException.
(Hairong Kuang via tomwhite)
+129. HADOOP-1242. Improve handling of DFS upgrades.
+ (Konstantin Shvachko via cutting)
+
+130. HADOOP-1332. Fix so that TaskTracker exits reliably during unit
+ tests on Windows. (omalley via cutting)
+
Release 0.12.3 - 2007-04-06
Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java Thu May 31 12:31:57 2007
@@ -11,7 +11,6 @@
import org.apache.hadoop.dfs.FSConstants.StartupOption;
import org.apache.hadoop.dfs.FSConstants.NodeType;
-import org.apache.hadoop.dfs.FSImage.NameNodeFile;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.fs.FileUtil.HardLink;
@@ -164,6 +163,21 @@
File oldF = new File(sd.root, "storage");
if (!oldF.exists())
return false;
+ // check the layout version inside the storage file
+ // Lock and Read old storage file
+ RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
+ if (oldFile == null)
+ throw new IOException("Cannot read file: " + oldF);
+ FileLock oldLock = oldFile.getChannel().tryLock();
+ try {
+ oldFile.seek(0);
+ int odlVersion = oldFile.readInt();
+ if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
+ return false;
+ } finally {
+ oldLock.release();
+ oldFile.close();
+ }
// check consistency of the old storage
File oldDataDir = new File(sd.root, "data");
if (!oldDataDir.exists())
@@ -206,13 +220,14 @@
FileLock oldLock = oldFile.getChannel().tryLock();
if (oldLock == null)
throw new IOException("Cannot lock file: " + oldF);
+ String odlStorageID = "";
try {
oldFile.seek(0);
int odlVersion = oldFile.readInt();
if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
throw new IncorrectVersionException(odlVersion, "file " + oldF,
LAST_PRE_UPGRADE_LAYOUT_VERSION);
- String odlStorageID = org.apache.hadoop.io.UTF8.readString(oldFile);
+ odlStorageID = org.apache.hadoop.io.UTF8.readString(oldFile);
// check new storage
File newDataDir = sd.getCurrentDir();
@@ -221,14 +236,8 @@
throw new IOException("Version file already exists: " + versionF);
if (newDataDir.exists()) // somebody created current dir manually
deleteDir(newDataDir);
- // Write new layout
+ // move "data" to "current"
rename(oldDataDir, newDataDir);
-
- this.layoutVersion = FSConstants.LAYOUT_VERSION;
- this.namespaceID = nsInfo.getNamespaceID();
- this.cTime = 0;
- this.storageID = odlStorageID;
- sd.write();
// close and unlock old file
} finally {
oldLock.release();
@@ -236,6 +245,13 @@
}
// move old storage file into current dir
rename(oldF, new File(sd.getCurrentDir(), "storage"));
+
+ // Write new version file
+ this.layoutVersion = FSConstants.LAYOUT_VERSION;
+ this.namespaceID = nsInfo.getNamespaceID();
+ this.cTime = 0;
+ this.storageID = odlStorageID;
+ sd.write();
LOG.info("Conversion of " + oldF + " is complete.");
}
@@ -408,5 +424,23 @@
for(int i = 0; i < blockNames.length; i++)
linkBlocks(new File(from, blockNames[i]), new File(to, blockNames[i]));
+ }
+
+ protected void corruptPreUpgradeStorage(File rootDir) throws IOException {
+ File oldF = new File(rootDir, "storage");
+ if (oldF.exists())
+ return;
+ // recreate old storage file to let pre-upgrade versions fail
+ if (!oldF.createNewFile())
+ throw new IOException("Cannot create file " + oldF);
+ RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
+ if (oldFile == null)
+ throw new IOException("Cannot read file: " + oldF);
+ // write new version into old storage file
+ try {
+ writeCorruptedData(oldFile);
+ } finally {
+ oldFile.close();
+ }
}
}
Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java Thu May 31 12:31:57 2007
@@ -27,6 +27,7 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
@@ -435,7 +436,21 @@
boolean isConversionNeeded(StorageDirectory sd) throws IOException {
File oldImageDir = new File(sd.root, "image");
if (!oldImageDir.exists())
- return false;
+ throw new InconsistentFSStateException(sd.root,
+ oldImageDir + " does not exist.");
+ // check the layout version inside the image file
+ File oldF = new File(oldImageDir, "fsimage");
+ RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
+ if (oldFile == null)
+ throw new IOException("Cannot read file: " + oldF);
+ try {
+ oldFile.seek(0);
+ int odlVersion = oldFile.readInt();
+ if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
+ return false;
+ } finally {
+ oldFile.close();
+ }
// check consistency of the old storage
if (!oldImageDir.isDirectory())
throw new InconsistentFSStateException(sd.root,
@@ -492,8 +507,8 @@
needReformat = true;
} else {
sd.write();
- LOG.info("Conversion of " + oldImage + " is complete.");
}
+ LOG.info("Conversion of " + oldImage + " is complete.");
return needReformat;
}
@@ -958,6 +973,27 @@
node.setRemaining(remaining);
node.setLastUpdate(lastUpdate);
node.setXceiverCount(xceiverCount);
+ }
+ }
+
+ protected void corruptPreUpgradeStorage(File rootDir) throws IOException {
+ File oldImageDir = new File(rootDir, "image");
+ if (!oldImageDir.exists())
+ if (!oldImageDir.mkdir())
+ throw new IOException("Cannot create directory " + oldImageDir);
+ File oldImage = new File(oldImageDir, "fsimage");
+ if (!oldImage.exists())
+ // recreate old image file to let pre-upgrade versions fail
+ if (!oldImage.createNewFile())
+ throw new IOException("Cannot create file " + oldImage);
+ RandomAccessFile oldFile = new RandomAccessFile(oldImage, "rws");
+ if (oldFile == null)
+ throw new IOException("Cannot read file: " + oldImage);
+ // write new version into old image file
+ try {
+ writeCorruptedData(oldFile);
+ } finally {
+ oldFile.close();
}
}
}
Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java Thu May 31 12:31:57 2007
@@ -157,6 +157,7 @@
* @throws IOException
*/
void write() throws IOException {
+ corruptPreUpgradeStorage(root);
write(getVersionFile());
}
@@ -520,5 +521,21 @@
return "NS-" + Integer.toString(storage.getNamespaceID())
+ "-" + Integer.toString(storage.getLayoutVersion())
+ "-" + Long.toString(storage.getCTime());
+ }
+
+ // Pre-upgrade version compatibility
+ protected abstract void corruptPreUpgradeStorage(File rootDir) throws IOException;
+
+ protected void writeCorruptedData(RandomAccessFile file) throws IOException {
+ final String messageForPreUpgradeVersion =
+ "\nThis file is INTENTIONALLY CORRUPTED so that versions\n"
+ + "of Hadoop prior to 0.13 (which are incompatible\n"
+ + "with this directory layout) will fail to start.\n";
+
+ file.seek(0);
+ file.writeInt(FSConstants.LAYOUT_VERSION);
+ org.apache.hadoop.io.UTF8.writeString(file, "");
+ file.writeBytes(messageForPreUpgradeVersion);
+ file.getFD().sync();
}
}
Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu May 31 12:31:57 2007
@@ -388,16 +388,19 @@
*/
private void runChild(String[] args, File dir) throws IOException {
this.process = Runtime.getRuntime().exec(args, null, dir);
+
+ Thread logStdErrThread = null;
+ Thread logStdOutThread = null;
try {
- new Thread() {
- public void run() {
- // Copy stderr of the process
- logStream(process.getErrorStream(), taskStdErrLogWriter);
- }
- }.start();
-
- // Copy stderr of the process; normally empty
- logStream(process.getInputStream(), taskStdOutLogWriter);
+ // Copy stderr of the child-process via a thread
+ logStdErrThread = logStream((t.getTaskId() + " - " + "stderr"),
+ process.getErrorStream(),
+ taskStdErrLogWriter);
+
+ // Copy stdout of the child-process via a thread
+ logStdOutThread = logStream((t.getTaskId() + " - " + "stdout"),
+ process.getInputStream(),
+ taskStdOutLogWriter);
int exit_code = process.waitFor();
@@ -410,8 +413,21 @@
throw new IOException(e.toString());
} finally {
kill();
- taskStdOutLogWriter.close();
- taskStdErrLogWriter.close();
+
+ // Kill both stdout/stderr copying threads
+ if (logStdErrThread != null) {
+ logStdErrThread.interrupt();
+ try {
+ logStdErrThread.join();
+ } catch (InterruptedException ie) {}
+ }
+
+ if (logStdOutThread != null) {
+ logStdOutThread.interrupt();
+ try {
+ logStdOutThread.join();
+ } catch (InterruptedException ie) {}
+ }
}
}
@@ -426,24 +442,47 @@
}
/**
+ * Spawn a new thread to copy the child-jvm's stdout/stderr streams
+ * via a {@link TaskLog.Writer}
+ *
+ * @param threadName thread name
+ * @param stream child-jvm's stdout/stderr stream
+ * @param writer {@link TaskLog.Writer} used to copy the child-jvm's data
+ * @return Return the newly created thread
*/
- private void logStream(InputStream output, TaskLog.Writer taskLog) {
- try {
- byte[] buf = new byte[512];
- int n = 0;
- while ((n = output.read(buf, 0, buf.length)) != -1) {
- // Write out to the task's log
- taskLog.write(buf, 0, n);
- }
- } catch (IOException e) {
- LOG.warn(t.getTaskId()+" Error reading child output", e);
- } finally {
- try {
- output.close();
- } catch (IOException e) {
- LOG.warn(t.getTaskId()+" Error closing child output", e);
+ private Thread logStream(String threadName,
+ final InputStream stream,
+ final TaskLog.Writer taskLog) {
+ Thread loggerThread = new Thread() {
+ public void run() {
+ try {
+ byte[] buf = new byte[512];
+ while (!Thread.interrupted()) {
+ while (stream.available() > 0) {
+ int n = stream.read(buf, 0, buf.length);
+ taskLog.write(buf, 0, n);
+ }
+ Thread.sleep(1000);
+ }
+ } catch (IOException e) {
+ LOG.warn(t.getTaskId()+" Error reading child output", e);
+ } catch (InterruptedException e) {
+ // expected
+ } finally {
+ try {
+ stream.close();
+ taskLog.close();
+ } catch (IOException e) {
+ LOG.warn(t.getTaskId()+" Error closing child output", e);
+ }
+ }
}
- }
+ };
+ loggerThread.setName(threadName);
+ loggerThread.setDaemon(true);
+ loggerThread.start();
+
+ return loggerThread;
}
}
Modified: lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java Thu May 31 12:31:57 2007
@@ -23,11 +23,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.dfs.FSConstants.NodeType;
import static org.apache.hadoop.dfs.FSConstants.NodeType.NAME_NODE;
import static org.apache.hadoop.dfs.FSConstants.NodeType.DATA_NODE;
import org.apache.hadoop.dfs.FSConstants.StartupOption;
-import org.apache.hadoop.fs.Path;
/**
* This test ensures the appropriate response from the system when
@@ -82,7 +80,6 @@
* This test attempts to finalize the NameNode and DataNode.
*/
public void testFinalize() throws Exception {
- File[] baseDirs;
UpgradeUtilities.initialize();
for (int numDirs = 1; numDirs <= 2; numDirs++) {
Modified: lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java Thu May 31 12:31:57 2007
@@ -250,16 +250,26 @@
LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
switch (nodeType) {
case NAME_NODE:
- localFS.copyToLocalFile(
- new Path(namenodeStorage.toString(), "current"),
+ localFS.copyToLocalFile(new Path(namenodeStorage.toString(), "current"),
new Path(newDir.toString()),
false);
+ Path newImgDir = new Path(newDir.getParent(), "image");
+ if (!localFS.exists(newImgDir))
+ localFS.copyToLocalFile(
+ new Path(namenodeStorage.toString(), "image"),
+ newImgDir,
+ false);
break;
case DATA_NODE:
- localFS.copyToLocalFile(
- new Path(datanodeStorage.toString(), "current"),
+ localFS.copyToLocalFile(new Path(datanodeStorage.toString(), "current"),
new Path(newDir.toString()),
false);
+ Path newStorageFile = new Path(newDir.getParent(), "storage");
+ if (!localFS.exists(newStorageFile))
+ localFS.copyToLocalFile(
+ new Path(datanodeStorage.toString(), "storage"),
+ newStorageFile,
+ false);
break;
}
retVal[i] = newDir;