You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/12/05 20:22:25 UTC
svn commit: r1417596 [4/6] - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
dev-support/
src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/
src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java...
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Wed Dec 5 19:22:17 2012
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -368,30 +369,36 @@ public class EditLogFileInputStream exte
@Override
public InputStream getInputStream() throws IOException {
- HttpURLConnection connection = (HttpURLConnection)
- SecurityUtil.openSecureHttpConnection(url);
-
- if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
- throw new HttpGetFailedException(
- "Fetch of " + url +
- " failed with status code " + connection.getResponseCode() +
- "\nResponse message:\n" + connection.getResponseMessage(),
- connection);
- }
-
- String contentLength = connection.getHeaderField(CONTENT_LENGTH);
- if (contentLength != null) {
- advertisedSize = Long.parseLong(contentLength);
- if (advertisedSize <= 0) {
- throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
- contentLength);
- }
- } else {
- throw new IOException(CONTENT_LENGTH + " header is not provided " +
- "by the server when trying to fetch " + url);
- }
-
- return connection.getInputStream();
+ return SecurityUtil.doAsCurrentUser(
+ new PrivilegedExceptionAction<InputStream>() {
+ @Override
+ public InputStream run() throws IOException {
+ HttpURLConnection connection = (HttpURLConnection)
+ SecurityUtil.openSecureHttpConnection(url);
+
+ if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ throw new HttpGetFailedException(
+ "Fetch of " + url +
+ " failed with status code " + connection.getResponseCode() +
+ "\nResponse message:\n" + connection.getResponseMessage(),
+ connection);
+ }
+
+ String contentLength = connection.getHeaderField(CONTENT_LENGTH);
+ if (contentLength != null) {
+ advertisedSize = Long.parseLong(contentLength);
+ if (advertisedSize <= 0) {
+ throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
+ contentLength);
+ }
+ } else {
+ throw new IOException(CONTENT_LENGTH + " header is not provided " +
+ "by the server when trying to fetch " + url);
+ }
+
+ return connection.getInputStream();
+ }
+ });
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Wed Dec 5 19:22:17 2012
@@ -176,7 +176,7 @@ public class EditLogFileOutputStream ext
* accumulates new log records while readyBuffer will be flushed and synced.
*/
@Override
- public void flushAndSync() throws IOException {
+ public void flushAndSync(boolean durable) throws IOException {
if (fp == null) {
throw new IOException("Trying to use aborted output stream");
}
@@ -186,7 +186,7 @@ public class EditLogFileOutputStream ext
}
preallocate(); // preallocate file if necessay
doubleBuf.flushTo(fp);
- if (!shouldSkipFsyncForTests) {
+ if (durable && !shouldSkipFsyncForTests) {
fc.force(false); // metadata updates not needed
}
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Wed Dec 5 19:22:17 2012
@@ -24,6 +24,7 @@ import static org.apache.hadoop.util.Tim
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.jasper.compiler.JspUtil;
/**
* A generic abstract class to support journaling of edits logs into
@@ -92,18 +93,24 @@ public abstract class EditLogOutputStrea
/**
* Flush and sync all data that is ready to be flush
* {@link #setReadyToFlush()} into underlying persistent store.
+ * @param durable if true, the edits should be made truly durable before
+ * returning
* @throws IOException
*/
- abstract protected void flushAndSync() throws IOException;
+ abstract protected void flushAndSync(boolean durable) throws IOException;
/**
* Flush data to persistent store.
* Collect sync metrics.
*/
public void flush() throws IOException {
+ flush(true);
+ }
+
+ public void flush(boolean durable) throws IOException {
numSync++;
long start = now();
- flushAndSync();
+ flushAndSync(durable);
long end = now();
totalTimeSync += (end - start);
}
@@ -132,4 +139,12 @@ public abstract class EditLogOutputStrea
protected long getNumSync() {
return numSync;
}
+
+ /**
+ * @return a short HTML snippet suitable for describing the current
+ * status of the stream
+ */
+ public String generateHtmlReport() {
+ return JspUtil.escapeXml(this.toString());
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java Wed Dec 5 19:22:17 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -34,7 +35,8 @@ import com.google.common.base.Preconditi
* to progress concurrently to flushes without allocating new buffers each
* time.
*/
-class EditsDoubleBuffer {
+@InterfaceAudience.Private
+public class EditsDoubleBuffer {
private TxnBuffer bufCurrent; // current buffer for writing
private TxnBuffer bufReady; // buffer ready for flushing
@@ -51,11 +53,11 @@ class EditsDoubleBuffer {
bufCurrent.writeOp(op);
}
- void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+ public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
bufCurrent.write(bytes, offset, length);
}
- void close() throws IOException {
+ public void close() throws IOException {
Preconditions.checkNotNull(bufCurrent);
Preconditions.checkNotNull(bufReady);
@@ -69,7 +71,7 @@ class EditsDoubleBuffer {
bufCurrent = bufReady = null;
}
- void setReadyToFlush() {
+ public void setReadyToFlush() {
assert isFlushed() : "previous data not flushed yet";
TxnBuffer tmp = bufReady;
bufReady = bufCurrent;
@@ -80,12 +82,12 @@ class EditsDoubleBuffer {
* Writes the content of the "ready" buffer to the given output stream,
* and resets it. Does not swap any buffers.
*/
- void flushTo(OutputStream out) throws IOException {
+ public void flushTo(OutputStream out) throws IOException {
bufReady.writeTo(out); // write data to file
bufReady.reset(); // erase all data in the buffer
}
- boolean shouldForceSync() {
+ public boolean shouldForceSync() {
return bufCurrent.size() >= initBufferSize;
}
@@ -120,6 +122,12 @@ class EditsDoubleBuffer {
return bufReady.numTxns;
}
+ /**
+ * @return the number of bytes that are ready to be flushed
+ */
+ public int countReadyBytes() {
+ return bufReady.size();
+ }
private static class TxnBuffer extends DataOutputBuffer {
long firstTxId;
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Dec 5 19:22:17 2012
@@ -1107,6 +1107,7 @@ public class FSEditLog implements LogsPu
journalSet.recoverUnfinalizedSegments();
} catch (IOException ex) {
// All journals have failed, it is handled in logSync.
+ // TODO: are we sure this is OK?
}
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Wed Dec 5 19:22:17 2012
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.ComparisonChain;
@@ -51,7 +53,8 @@ import com.google.common.collect.Compari
* Note: this class is not thread-safe and should be externally
* synchronized.
*/
-class FileJournalManager implements JournalManager {
+@InterfaceAudience.Private
+public class FileJournalManager implements JournalManager {
private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
private final StorageDirectory sd;
@@ -164,7 +167,7 @@ class FileJournalManager implements Jour
* @return a list of remote edit logs
* @throws IOException if edit logs cannot be listed.
*/
- List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
+ public List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
@@ -182,6 +185,8 @@ class FileJournalManager implements Jour
}
}
+ Collections.sort(ret);
+
return ret;
}
@@ -195,7 +200,7 @@ class FileJournalManager implements Jour
* @throws IOException
* IOException thrown for invalid logDir
*/
- static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
+ public static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
return matchEditLogs(FileUtil.listFiles(logDir));
}
@@ -223,7 +228,7 @@ class FileJournalManager implements Jour
try {
long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
ret.add(
- new EditLogFile(f, startTxId, startTxId, true));
+ new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true));
} catch (NumberFormatException nfe) {
LOG.error("In-progress edits file " + f + " has improperly " +
"formatted transaction ID");
@@ -237,15 +242,8 @@ class FileJournalManager implements Jour
@Override
synchronized public void selectInputStreams(
Collection<EditLogInputStream> streams, long fromTxId,
- boolean inProgressOk) {
- List<EditLogFile> elfs;
- try {
- elfs = matchEditLogs(sd.getCurrentDir());
- } catch (IOException e) {
- LOG.error("error listing files in " + this + ". " +
- "Skipping all edit logs in this directory.", e);
- return;
- }
+ boolean inProgressOk) throws IOException {
+ List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
"from among " + elfs.size() + " candidate file(s)");
@@ -326,7 +324,7 @@ class FileJournalManager implements Jour
}
}
- List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
+ public List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
List<EditLogFile> logFiles = Lists.newArrayList();
@@ -342,6 +340,32 @@ class FileJournalManager implements Jour
return logFiles;
}
+
+ public EditLogFile getLogFile(long startTxId) throws IOException {
+ return getLogFile(sd.getCurrentDir(), startTxId);
+ }
+
+ public static EditLogFile getLogFile(File dir, long startTxId)
+ throws IOException {
+ List<EditLogFile> files = matchEditLogs(dir);
+ List<EditLogFile> ret = Lists.newLinkedList();
+ for (EditLogFile elf : files) {
+ if (elf.getFirstTxId() == startTxId) {
+ ret.add(elf);
+ }
+ }
+
+ if (ret.isEmpty()) {
+ // no matches
+ return null;
+ } else if (ret.size() == 1) {
+ return ret.get(0);
+ } else {
+ throw new IllegalStateException("More than one log segment in " +
+ dir + " starting at txid " + startTxId + ": " +
+ Joiner.on(", ").join(ret));
+ }
+ }
@Override
public String toString() {
@@ -351,7 +375,8 @@ class FileJournalManager implements Jour
/**
* Record of an edit log that has been located and had its filename parsed.
*/
- static class EditLogFile {
+ @InterfaceAudience.Private
+ public static class EditLogFile {
private File file;
private final long firstTxId;
private long lastTxId;
@@ -384,17 +409,20 @@ class FileJournalManager implements Jour
assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID);
assert file != null;
+ Preconditions.checkArgument(!isInProgress ||
+ lastTxId == HdfsConstants.INVALID_TXID);
+
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.file = file;
this.isInProgress = isInProgress;
}
- long getFirstTxId() {
+ public long getFirstTxId() {
return firstTxId;
}
- long getLastTxId() {
+ public long getLastTxId() {
return lastTxId;
}
@@ -407,17 +435,17 @@ class FileJournalManager implements Jour
* This will update the lastTxId of the EditLogFile or
* mark it as corrupt if it is.
*/
- void validateLog() throws IOException {
+ public void validateLog() throws IOException {
EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
this.lastTxId = val.getEndTxId();
this.hasCorruptHeader = val.hasCorruptHeader();
}
- boolean isInProgress() {
+ public boolean isInProgress() {
return isInProgress;
}
- File getFile() {
+ public File getFile() {
return file;
}
@@ -430,7 +458,7 @@ class FileJournalManager implements Jour
renameSelf(".corrupt");
}
- void moveAsideEmptyFile() throws IOException {
+ public void moveAsideEmptyFile() throws IOException {
assert lastTxId == HdfsConstants.INVALID_TXID;
renameSelf(".empty");
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java Wed Dec 5 19:22:17 2012
@@ -26,7 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -146,7 +146,7 @@ public class JournalSet implements Journ
return journal;
}
- private boolean isDisabled() {
+ boolean isDisabled() {
return disabled;
}
@@ -164,8 +164,12 @@ public class JournalSet implements Journ
return required;
}
}
-
- private List<JournalAndStream> journals = Lists.newArrayList();
+
+ // COW implementation is necessary since some users (eg the web ui) call
+ // getAllJournalStreams() and then iterate. Since this is rarely
+ // mutated, there is no performance concern.
+ private List<JournalAndStream> journals =
+ new CopyOnWriteArrayList<JournalSet.JournalAndStream>();
final int minimumRedundantJournals;
JournalSet(int minimumRedundantResources) {
@@ -241,8 +245,20 @@ public class JournalSet implements Journ
LOG.info("Skipping jas " + jas + " since it's disabled");
continue;
}
- jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
+ try {
+ jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
+ } catch (IOException ioe) {
+ LOG.warn("Unable to determine input streams from " + jas.getManager() +
+ ". Skipping.", ioe);
+ }
}
+ chainAndMakeRedundantStreams(streams, allStreams, fromTxId, inProgressOk);
+ }
+
+ public static void chainAndMakeRedundantStreams(
+ Collection<EditLogInputStream> outStreams,
+ PriorityQueue<EditLogInputStream> allStreams,
+ long fromTxId, boolean inProgressOk) {
// We want to group together all the streams that start on the same start
// transaction ID. To do this, we maintain an accumulator (acc) of all
// the streams we've seen at a given start transaction ID. When we see a
@@ -260,7 +276,7 @@ public class JournalSet implements Journ
if (accFirstTxId == elis.getFirstTxId()) {
acc.add(elis);
} else if (accFirstTxId < elis.getFirstTxId()) {
- streams.add(new RedundantEditLogInputStream(acc, fromTxId));
+ outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
acc.add(elis);
} else if (accFirstTxId > elis.getFirstTxId()) {
@@ -271,7 +287,7 @@ public class JournalSet implements Journ
}
}
if (!acc.isEmpty()) {
- streams.add(new RedundantEditLogInputStream(acc, fromTxId));
+ outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
}
}
@@ -453,12 +469,12 @@ public class JournalSet implements Journ
}
@Override
- protected void flushAndSync() throws IOException {
+ protected void flushAndSync(final boolean durable) throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
- jas.getCurrentStream().flushAndSync();
+ jas.getCurrentStream().flushAndSync(durable);
}
}
}, "flushAndSync");
@@ -511,7 +527,6 @@ public class JournalSet implements Journ
}
}
- @VisibleForTesting
List<JournalAndStream> getAllJournalStreams() {
return journals;
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Dec 5 19:22:17 2012
@@ -725,6 +725,12 @@ public class NameNode {
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId);
checkAllowFormat(conf);
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ InetSocketAddress socAddr = getAddress(conf);
+ SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
+ DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+ }
Collection<URI> nameDirsToFormat = FSNamesystem.getNamespaceDirs(conf);
List<URI> sharedDirs = FSNamesystem.getSharedEditsDirs(conf);
@@ -766,13 +772,13 @@ public class NameNode {
}
@VisibleForTesting
- public static boolean initializeSharedEdits(Configuration conf) {
+ public static boolean initializeSharedEdits(Configuration conf) throws IOException {
return initializeSharedEdits(conf, true);
}
@VisibleForTesting
public static boolean initializeSharedEdits(Configuration conf,
- boolean force) {
+ boolean force) throws IOException {
return initializeSharedEdits(conf, force, false);
}
@@ -786,7 +792,7 @@ public class NameNode {
* @return true if the command aborts, false otherwise
*/
private static boolean initializeSharedEdits(Configuration conf,
- boolean force, boolean interactive) {
+ boolean force, boolean interactive) throws IOException {
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId);
@@ -797,6 +803,12 @@ public class NameNode {
return false;
}
+ if (UserGroupInformation.isSecurityEnabled()) {
+ InetSocketAddress socAddr = getAddress(conf);
+ SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
+ DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+ }
+
NNStorage existingStorage = null;
try {
Configuration confWithoutShared = new Configuration(conf);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java Wed Dec 5 19:22:17 2012
@@ -41,6 +41,14 @@ final class NameNodeResourcePolicy {
static boolean areResourcesAvailable(
Collection<? extends CheckableNameNodeResource> resources,
int minimumRedundantResources) {
+
+ // TODO: workaround:
+ // - during startup, if there are no edits dirs on disk, then there is
+ // a call to areResourcesAvailable() with no dirs at all, which was
+ // previously causing the NN to enter safemode
+ if (resources.isEmpty()) {
+ return true;
+ }
int requiredResourceCount = 0;
int redundantResourceCount = 0;
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Wed Dec 5 19:22:17 2012
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.Text;
@@ -60,6 +61,8 @@ import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.znerd.xmlenc.XMLOutputter;
+import com.google.common.base.Preconditions;
+
class NamenodeJspHelper {
static String getSafeModeText(FSNamesystem fsn) {
if (!fsn.isInSafeMode())
@@ -212,6 +215,52 @@ class NamenodeJspHelper {
out.print("</table></div>\n");
}
+
+ /**
+ * Generate an HTML report containing the current status of the HDFS
+ * journals.
+ */
+ void generateJournalReport(JspWriter out, NameNode nn,
+ HttpServletRequest request) throws IOException {
+ FSEditLog log = nn.getFSImage().getEditLog();
+ Preconditions.checkArgument(log != null, "no edit log set in %s", nn);
+
+ out.println("<h3> " + nn.getRole() + " Journal Status: </h3>");
+
+ out.println("<b>Current transaction ID:</b> " +
+ nn.getFSImage().getLastAppliedOrWrittenTxId() + "<br/>");
+
+
+ boolean openForWrite = log.isOpenForWrite();
+
+ out.println("<div class=\"dfstable\">");
+ out.println("<table class=\"storage\" title=\"NameNode Journals\">\n"
+ + "<thead><tr><td><b>Journal Manager</b></td><td><b>State</b></td></tr></thead>");
+ for (JournalAndStream jas : log.getJournals()) {
+ out.print("<tr>");
+ out.print("<td>" + jas.getManager());
+ if (jas.isRequired()) {
+ out.print(" [required]");
+ }
+ out.print("</td><td>");
+
+ if (jas.isDisabled()) {
+ out.print("<span class=\"failed\">Failed</span>");
+ } else if (openForWrite) {
+ EditLogOutputStream elos = jas.getCurrentStream();
+ if (elos != null) {
+ out.println(elos.generateHtmlReport());
+ } else {
+ out.println("not currently writing");
+ }
+ } else {
+ out.println("open for read");
+ }
+ out.println("</td></tr>");
+ }
+
+ out.println("</table></div>");
+ }
void generateHealthReport(JspWriter out, NameNode nn,
HttpServletRequest request) throws IOException {
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java Wed Dec 5 19:22:17 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import org.apache.commons.logging.Log;
@@ -172,11 +173,20 @@ public class EditLogTailer {
Preconditions.checkState(tailerThread == null ||
!tailerThread.isAlive(),
"Tailer thread should not be running once failover starts");
- try {
- doTailEdits();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
+ // Important to do tailing as the login user, in case the shared
+ // edits storage is implemented by a JournalManager that depends
+ // on security credentials to access the logs (eg QuorumJournalManager).
+ SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ try {
+ doTailEdits();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ return null;
+ }
+ });
}
@VisibleForTesting
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java Wed Dec 5 19:22:17 2012
@@ -17,18 +17,15 @@
*/
package org.apache.hadoop.hdfs.server.protocol;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.io.Writable;
import com.google.common.base.Function;
import com.google.common.collect.ComparisonChain;
-public class RemoteEditLog implements Writable, Comparable<RemoteEditLog> {
+public class RemoteEditLog implements Comparable<RemoteEditLog> {
private long startTxId = HdfsConstants.INVALID_TXID;
private long endTxId = HdfsConstants.INVALID_TXID;
+ private boolean isInProgress = false;
public RemoteEditLog() {
}
@@ -36,6 +33,13 @@ public class RemoteEditLog implements Wr
public RemoteEditLog(long startTxId, long endTxId) {
this.startTxId = startTxId;
this.endTxId = endTxId;
+ this.isInProgress = (endTxId == HdfsConstants.INVALID_TXID);
+ }
+
+ public RemoteEditLog(long startTxId, long endTxId, boolean inProgress) {
+ this.startTxId = startTxId;
+ this.endTxId = endTxId;
+ this.isInProgress = inProgress;
}
public long getStartTxId() {
@@ -45,22 +49,18 @@ public class RemoteEditLog implements Wr
public long getEndTxId() {
return endTxId;
}
-
- @Override
- public String toString() {
- return "[" + startTxId + "," + endTxId + "]";
- }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(startTxId);
- out.writeLong(endTxId);
+ public boolean isInProgress() {
+ return isInProgress;
}
@Override
- public void readFields(DataInput in) throws IOException {
- startTxId = in.readLong();
- endTxId = in.readLong();
+ public String toString() {
+ if (!isInProgress) {
+ return "[" + startTxId + "," + endTxId + "]";
+ } else {
+ return "[" + startTxId + "-? (in-progress)]";
+ }
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java Wed Dec 5 19:22:17 2012
@@ -40,8 +40,8 @@ public class RemoteEditLogManifest {
/**
- * Check that the logs are contiguous and non-overlapping
- * sequences of transactions, in sorted order
+ * Check that the logs are non-overlapping sequences of transactions,
+ * in sorted order. They do not need to be contiguous.
* @throws IllegalStateException if incorrect
*/
private void checkState() {
@@ -50,8 +50,10 @@ public class RemoteEditLogManifest {
RemoteEditLog prev = null;
for (RemoteEditLog log : logs) {
if (prev != null) {
- if (log.getStartTxId() != prev.getEndTxId() + 1) {
- throw new IllegalStateException("Invalid log manifest:" + this);
+ if (log.getStartTxId() <= prev.getEndTxId()) {
+ throw new IllegalStateException(
+ "Invalid log manifest (log " + log + " overlaps " + prev + ")\n"
+ + this);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java Wed Dec 5 19:22:17 2012
@@ -56,7 +56,7 @@ public class BinaryEditsVisitor implemen
@Override
public void close(Throwable error) throws IOException {
elfos.setReadyToFlush();
- elfos.flushAndSync();
+ elfos.flushAndSync(true);
elfos.close();
}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.io.Files;
+import com.google.common.primitives.Longs;
+
+/**
+ * Class that represents a file on disk which stores a single <code>long</code>
+ * value, but does not make any effort to make it truly durable. This is in
+ * contrast to {@link PersistentLongFile} which fsync()s the value on every
+ * change.
+ *
+ * This should be used for values which are updated frequently (such that
+ * performance is important) and not required to be up-to-date for correctness.
+ *
+ * This class also differs in that it stores the value as binary data instead
+ * of a textual string.
+ */
+@InterfaceAudience.Private
+public class BestEffortLongFile implements Closeable {
+
+ private final File file;
+ private final long defaultVal;
+
+ private long value;
+
+ private FileChannel ch = null;
+
+ private ByteBuffer buf = ByteBuffer.allocate(Long.SIZE/8);
+
+ public BestEffortLongFile(File file, long defaultVal) {
+ this.file = file;
+ this.defaultVal = defaultVal;
+ }
+
+ public long get() throws IOException {
+ lazyOpen();
+ return value;
+ }
+
+ public void set(long newVal) throws IOException {
+ lazyOpen();
+ buf.clear();
+ buf.putLong(newVal);
+ buf.flip();
+ IOUtils.writeFully(ch, buf, 0);
+ value = newVal;
+ }
+
+ private void lazyOpen() throws IOException {
+ if (ch != null) {
+ return;
+ }
+
+ // Load current value.
+ byte[] data = null;
+ try {
+ data = Files.toByteArray(file);
+ } catch (FileNotFoundException fnfe) {
+ // Expected - this will use default value.
+ }
+
+ if (data != null && data.length != 0) {
+ if (data.length != Longs.BYTES) {
+ throw new IOException("File " + file + " had invalid length: " +
+ data.length);
+ }
+ value = Longs.fromByteArray(data);
+ } else {
+ value = defaultVal;
+ }
+
+ // Now open file for future writes.
+ RandomAccessFile raf = new RandomAccessFile(file, "rw");
+ try {
+ ch = raf.getChannel();
+ } finally {
+ if (ch == null) {
+ IOUtils.closeStream(raf);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (ch != null) {
+ ch.close();
+ }
+ }
+}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java Wed Dec 5 19:22:17 2012
@@ -57,7 +57,9 @@ public class PersistentLongFile {
}
public void set(long newVal) throws IOException {
- writeFile(file, newVal);
+ if (value != newVal || !loaded) {
+ writeFile(file, newVal);
+ }
value = newVal;
loaded = true;
}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto Wed Dec 5 19:22:17 2012
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hdfs.qjournal.protocol";
+option java_outer_classname = "QJournalProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdfs;
+
+import "hdfs.proto";
+
+message JournalIdProto {
+ required string identifier = 1;
+}
+
+message RequestInfoProto {
+ required JournalIdProto journalId = 1;
+ required uint64 epoch = 2;
+ required uint64 ipcSerialNumber = 3;
+
+ // Whenever a writer makes a request, it informs
+ // the node of the latest committed txid. This may
+ // be higher than the transaction data included in the
+ // request itself, eg in the case that the node has
+ // fallen behind.
+ optional uint64 committedTxId = 4;
+}
+
+message SegmentStateProto {
+ required uint64 startTxId = 1;
+ required uint64 endTxId = 2;
+ required bool isInProgress = 3;
+}
+
+/**
+ * The storage format used on local disk for previously
+ * accepted decisions.
+ */
+message PersistedRecoveryPaxosData {
+ required SegmentStateProto segmentState = 1;
+ required uint64 acceptedInEpoch = 2;
+}
+
+/**
+ * journal()
+ */
+
+message JournalRequestProto {
+ required RequestInfoProto reqInfo = 1;
+ required uint64 firstTxnId = 2;
+ required uint32 numTxns = 3;
+ required bytes records = 4;
+ required uint64 segmentTxnId = 5;
+}
+
+message JournalResponseProto {
+}
+
+/**
+ * heartbeat()
+ */
+
+message HeartbeatRequestProto {
+ required RequestInfoProto reqInfo = 1;
+}
+
+message HeartbeatResponseProto { // void response
+}
+
+/**
+ * startLogSegment()
+ */
+message StartLogSegmentRequestProto {
+ required RequestInfoProto reqInfo = 1;
+ required uint64 txid = 2; // Transaction ID
+}
+
+message StartLogSegmentResponseProto {
+}
+
+/**
+ * finalizeLogSegment()
+ */
+message FinalizeLogSegmentRequestProto {
+ required RequestInfoProto reqInfo = 1;
+ required uint64 startTxId = 2;
+ required uint64 endTxId = 3;
+}
+
+message FinalizeLogSegmentResponseProto {
+}
+
+/**
+ * purgeLogs()
+ */
+message PurgeLogsRequestProto {
+ required RequestInfoProto reqInfo = 1;
+ required uint64 minTxIdToKeep = 2;
+}
+
+message PurgeLogsResponseProto {
+}
+
+/**
+ * isFormatted()
+ */
+message IsFormattedRequestProto {
+ required JournalIdProto jid = 1;
+}
+
+message IsFormattedResponseProto {
+ required bool isFormatted = 1;
+}
+
+/**
+ * getJournalState()
+ */
+message GetJournalStateRequestProto {
+ required JournalIdProto jid = 1;
+}
+
+message GetJournalStateResponseProto {
+ required uint64 lastPromisedEpoch = 1;
+ required uint32 httpPort = 2;
+}
+
+/**
+ * format()
+ */
+message FormatRequestProto {
+ required JournalIdProto jid = 1;
+ required NamespaceInfoProto nsInfo = 2;
+}
+
+message FormatResponseProto {
+}
+
+/**
+ * newEpoch()
+ */
+message NewEpochRequestProto {
+ required JournalIdProto jid = 1;
+ required NamespaceInfoProto nsInfo = 2;
+ required uint64 epoch = 3;
+}
+
+message NewEpochResponseProto {
+ optional uint64 lastSegmentTxId = 1;
+}
+
+/**
+ * getEditLogManifest()
+ */
+message GetEditLogManifestRequestProto {
+ required JournalIdProto jid = 1;
+ required uint64 sinceTxId = 2; // Transaction ID
+}
+
+message GetEditLogManifestResponseProto {
+ required RemoteEditLogManifestProto manifest = 1;
+ required uint32 httpPort = 2;
+
+ // TODO: we should add nsinfo somewhere
+ // to verify that it matches up with our expectation
+ // required NamespaceInfoProto nsInfo = 2;
+}
+
+/**
+ * prepareRecovery()
+ */
+message PrepareRecoveryRequestProto {
+ required RequestInfoProto reqInfo = 1;
+ required uint64 segmentTxId = 2;
+}
+
+message PrepareRecoveryResponseProto {
+ optional SegmentStateProto segmentState = 1;
+ optional uint64 acceptedInEpoch = 2;
+ required uint64 lastWriterEpoch = 3;
+
+ // The highest committed txid that this logger has ever seen.
+ // This may be higher than the data it actually has, in the case
+ // that it was lagging before the old writer crashed.
+ optional uint64 lastCommittedTxId = 4;
+}
+
+/**
+ * acceptRecovery()
+ */
+message AcceptRecoveryRequestProto {
+ required RequestInfoProto reqInfo = 1;
+
+ /** Details on the segment to recover */
+ required SegmentStateProto stateToAccept = 2;
+
+ /** The URL from which the log may be copied */
+ required string fromURL = 3;
+}
+
+message AcceptRecoveryResponseProto {
+}
+
+
+/**
+ * Protocol used to journal edits to a JournalNode.
+ * See the request and response for details of rpc call.
+ */
+service QJournalProtocolService {
+ rpc isFormatted(IsFormattedRequestProto) returns (IsFormattedResponseProto);
+
+ rpc getJournalState(GetJournalStateRequestProto) returns (GetJournalStateResponseProto);
+
+ rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto);
+
+ rpc format(FormatRequestProto) returns (FormatResponseProto);
+
+ rpc journal(JournalRequestProto) returns (JournalResponseProto);
+
+ rpc heartbeat(HeartbeatRequestProto) returns (HeartbeatResponseProto);
+
+ rpc startLogSegment(StartLogSegmentRequestProto)
+ returns (StartLogSegmentResponseProto);
+
+ rpc finalizeLogSegment(FinalizeLogSegmentRequestProto)
+ returns (FinalizeLogSegmentResponseProto);
+
+ rpc purgeLogs(PurgeLogsRequestProto)
+ returns (PurgeLogsResponseProto);
+
+ rpc getEditLogManifest(GetEditLogManifestRequestProto)
+ returns (GetEditLogManifestResponseProto);
+
+ rpc prepareRecovery(PrepareRecoveryRequestProto)
+ returns (PrepareRecoveryResponseProto);
+
+ rpc acceptRecovery(AcceptRecoveryRequestProto)
+ returns (AcceptRecoveryResponseProto);
+}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Wed Dec 5 19:22:17 2012
@@ -305,6 +305,7 @@ message BlocksWithLocationsProto {
message RemoteEditLogProto {
required uint64 startTxId = 1; // Starting available edit log transaction
required uint64 endTxId = 2; // Ending available edit log transaction
+ optional bool isInProgress = 3 [default = false];
}
/**
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Wed Dec 5 19:22:17 2012
@@ -260,6 +260,11 @@
</property>
<property>
+ <name>dfs.namenode.edits.journal-plugin.qjournal</name>
+ <value>org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager</value>
+</property>
+
+<property>
<name>dfs.permissions.enabled</name>
<value>true</value>
<description>
@@ -1162,4 +1167,21 @@
</description>
</property>
+<property>
+ <name>dfs.journalnode.rpc-address</name>
+ <value>0.0.0.0:8485</value>
+ <description>
+ The JournalNode RPC server address and port.
+ </description>
+</property>
+
+<property>
+ <name>dfs.journalnode.http-address</name>
+ <value>0.0.0.0:8480</value>
+ <description>
+ The address and port the JournalNode web UI listens on.
+ If the port is 0 then the server will start on a free port.
+ </description>
+</property>
+
</configuration>
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp Wed Dec 5 19:22:17 2012
@@ -60,8 +60,10 @@
<%= NamenodeJspHelper.getCorruptFilesWarning(fsn)%>
<% healthjsp.generateHealthReport(out, nn, request); %>
-<hr>
+<% healthjsp.generateJournalReport(out, nn, request); %>
+<hr/>
<% healthjsp.generateConfReport(out, nn, request); %>
+<hr>
<%
out.println(ServletUtil.htmlFooter());
%>
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html Wed Dec 5 19:22:17 2012
@@ -0,0 +1,29 @@
+<meta HTTP-EQUIV="REFRESH" content="0;url=journalstatus.jsp"/>
+<html>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head><title>Hadoop Administration</title></head>
+
+<body>
+<h1>Hadoop Administration</h1>
+
+<ul>
+ <li><a href="journalstatus.jsp">Status</a></li>
+</ul>
+
+</body>
+</html>
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp Wed Dec 5 19:22:17 2012
@@ -0,0 +1,42 @@
+<%
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+%>
+<%@ page
+ contentType="text/html; charset=UTF-8"
+ import="org.apache.hadoop.hdfs.server.common.JspHelper"
+ import="org.apache.hadoop.util.ServletUtil"
+%>
+<%!
+ //for java.io.Serializable
+ private static final long serialVersionUID = 1L;
+%>
+
+<!DOCTYPE html>
+<html>
+<link rel="stylesheet" type="text/css" href="/static/hadoop.css">
+<title>Hadoop JournalNode</title>
+
+<body>
+<h1>JournalNode</h1>
+<%= JspHelper.getVersionTable() %>
+<hr />
+
+<br />
+<b><a href="/logs/">Logs</a></b>
+<%= ServletUtil.htmlFooter() %>
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml Wed Dec 5 19:22:17 2012
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<web-app version="2.4" xmlns="http://java.sun.com/xml/ns/j2ee">
+@journal.servlet.definitions@
+</web-app>
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Dec 5 19:22:17 2012
@@ -85,6 +85,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo;
+import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
/** Utilities for HDFS tests */
@@ -594,12 +595,21 @@ public class DFSTestUtil {
IOUtils.copyBytes(is, os, s.length(), true);
}
- // Returns url content as string.
+ /**
+ * @return url content as string (UTF-8 encoding assumed)
+ */
public static String urlGet(URL url) throws IOException {
+ return new String(urlGetBytes(url), Charsets.UTF_8);
+ }
+
+ /**
+ * @return URL contents as a byte array
+ */
+ public static byte[] urlGetBytes(URL url) throws IOException {
URLConnection conn = url.openConnection();
ByteArrayOutputStream out = new ByteArrayOutputStream();
IOUtils.copyBytes(conn.getInputStream(), out, 4096, true);
- return out.toString();
+ return out.toByteArray();
}
/**
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+public class MiniJournalCluster {
+ public static class Builder {
+ private String baseDir;
+ private int numJournalNodes = 3;
+ private boolean format = true;
+ private Configuration conf;
+
+ public Builder(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Builder baseDir(String d) {
+ this.baseDir = d;
+ return this;
+ }
+
+ public Builder numJournalNodes(int n) {
+ this.numJournalNodes = n;
+ return this;
+ }
+
+ public Builder format(boolean f) {
+ this.format = f;
+ return this;
+ }
+
+ public MiniJournalCluster build() throws IOException {
+ return new MiniJournalCluster(this);
+ }
+ }
+
+ private static final Log LOG = LogFactory.getLog(MiniJournalCluster.class);
+ private File baseDir;
+ private JournalNode nodes[];
+ private InetSocketAddress ipcAddrs[];
+ private InetSocketAddress httpAddrs[];
+
+ private MiniJournalCluster(Builder b) throws IOException {
+ LOG.info("Starting MiniJournalCluster with " +
+ b.numJournalNodes + " journal nodes");
+
+ if (b.baseDir != null) {
+ this.baseDir = new File(b.baseDir);
+ } else {
+ this.baseDir = new File(MiniDFSCluster.getBaseDirectory());
+ }
+
+ nodes = new JournalNode[b.numJournalNodes];
+ ipcAddrs = new InetSocketAddress[b.numJournalNodes];
+ httpAddrs = new InetSocketAddress[b.numJournalNodes];
+ for (int i = 0; i < b.numJournalNodes; i++) {
+ if (b.format) {
+ File dir = getStorageDir(i);
+ LOG.debug("Fully deleting JN directory " + dir);
+ FileUtil.fullyDelete(dir);
+ }
+ nodes[i] = new JournalNode();
+ nodes[i].setConf(createConfForNode(b, i));
+ nodes[i].start();
+
+ ipcAddrs[i] = nodes[i].getBoundIpcAddress();
+ httpAddrs[i] = nodes[i].getBoundHttpAddress();
+ }
+ }
+
+ /**
+ * Set up the given Configuration object to point to the set of JournalNodes
+ * in this cluster.
+ */
+ public URI getQuorumJournalURI(String jid) {
+ List<String> addrs = Lists.newArrayList();
+ for (InetSocketAddress addr : ipcAddrs) {
+ addrs.add("127.0.0.1:" + addr.getPort());
+ }
+ String addrsVal = Joiner.on(";").join(addrs);
+ LOG.debug("Setting logger addresses to: " + addrsVal);
+ try {
+ return new URI("qjournal://" + addrsVal + "/" + jid);
+ } catch (URISyntaxException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ /**
+ * Start the JournalNodes in the cluster.
+ */
+ public void start() throws IOException {
+ for (JournalNode jn : nodes) {
+ jn.start();
+ }
+ }
+
+ /**
+ * Shutdown all of the JournalNodes in the cluster.
+ * @throws IOException if one or more nodes failed to stop
+ */
+ public void shutdown() throws IOException {
+ boolean failed = false;
+ for (JournalNode jn : nodes) {
+ try {
+ jn.stopAndJoin(0);
+ } catch (Exception e) {
+ failed = true;
+ LOG.warn("Unable to stop journal node " + jn, e);
+ }
+ }
+ if (failed) {
+ throw new IOException("Unable to shut down. Check log for details");
+ }
+ }
+
+ private Configuration createConfForNode(Builder b, int idx) {
+ Configuration conf = new Configuration(b.conf);
+ File logDir = getStorageDir(idx);
+ conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString());
+ conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0");
+ conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+ return conf;
+ }
+
+ public File getStorageDir(int idx) {
+ return new File(baseDir, "journalnode-" + idx).getAbsoluteFile();
+ }
+
+ public File getCurrentDir(int idx, String jid) {
+ return new File(new File(getStorageDir(idx), jid), "current");
+ }
+
+ public JournalNode getJournalNode(int i) {
+ return nodes[i];
+ }
+
+ public void restartJournalNode(int i) throws InterruptedException, IOException {
+ Configuration conf = new Configuration(nodes[i].getConf());
+ if (nodes[i].isStarted()) {
+ nodes[i].stopAndJoin(0);
+ }
+
+ conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "127.0.0.1:" +
+ ipcAddrs[i].getPort());
+ conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "127.0.0.1:" +
+ httpAddrs[i].getPort());
+
+ JournalNode jn = new JournalNode();
+ jn.setConf(conf);
+ jn.start();
+ }
+
+ public int getQuorumSize() {
+ return nodes.length / 2 + 1;
+ }
+
+ public int getNumNodes() {
+ return nodes.length;
+ }
+
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.collect.Lists;
+
+public abstract class QJMTestUtil {
+ public static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+ 12345, "mycluster", "my-bp", 0L);
+ public static final String JID = "test-journal";
+
+ public static byte[] createTxnData(int startTxn, int numTxns) throws Exception {
+ DataOutputBuffer buf = new DataOutputBuffer();
+ FSEditLogOp.Writer writer = new FSEditLogOp.Writer(buf);
+
+ for (long txid = startTxn; txid < startTxn + numTxns; txid++) {
+ FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
+ op.setTransactionId(txid);
+ writer.writeOp(op);
+ }
+
+ return Arrays.copyOf(buf.getData(), buf.getLength());
+ }
+
+ public static EditLogOutputStream writeSegment(MiniJournalCluster cluster,
+ QuorumJournalManager qjm, long startTxId, int numTxns,
+ boolean finalize) throws IOException {
+ EditLogOutputStream stm = qjm.startLogSegment(startTxId);
+ // Should create in-progress
+ assertExistsInQuorum(cluster,
+ NNStorage.getInProgressEditsFileName(startTxId));
+
+ writeTxns(stm, startTxId, numTxns);
+ if (finalize) {
+ stm.close();
+ qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1);
+ return null;
+ } else {
+ return stm;
+ }
+ }
+
+ public static void writeOp(EditLogOutputStream stm, long txid) throws IOException {
+ FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
+ op.setTransactionId(txid);
+ stm.write(op);
+ }
+
+ public static void writeTxns(EditLogOutputStream stm, long startTxId, int numTxns)
+ throws IOException {
+ for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
+ writeOp(stm, txid);
+ }
+ stm.setReadyToFlush();
+ stm.flush();
+ }
+
+ /**
+ * Verify that the given list of streams contains exactly the range of
+ * transactions specified, inclusive.
+ */
+ public static void verifyEdits(List<EditLogInputStream> streams,
+ int firstTxnId, int lastTxnId) throws IOException {
+
+ Iterator<EditLogInputStream> iter = streams.iterator();
+ assertTrue(iter.hasNext());
+ EditLogInputStream stream = iter.next();
+
+ for (int expected = firstTxnId;
+ expected <= lastTxnId;
+ expected++) {
+
+ FSEditLogOp op = stream.readOp();
+ while (op == null) {
+ assertTrue("Expected to find txid " + expected + ", " +
+ "but no more streams available to read from",
+ iter.hasNext());
+ stream = iter.next();
+ op = stream.readOp();
+ }
+
+ assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
+ assertEquals(expected, op.getTransactionId());
+ }
+
+ assertNull(stream.readOp());
+ assertFalse("Expected no more txns after " + lastTxnId +
+ " but more streams are available", iter.hasNext());
+ }
+
+
+ public static void assertExistsInQuorum(MiniJournalCluster cluster,
+ String fname) {
+ int count = 0;
+ for (int i = 0; i < 3; i++) {
+ File dir = cluster.getCurrentDir(i, JID);
+ if (new File(dir, fname).exists()) {
+ count++;
+ }
+ }
+ assertTrue("File " + fname + " should exist in a quorum of dirs",
+ count >= cluster.getQuorumSize());
+ }
+
+ public static long recoverAndReturnLastTxn(QuorumJournalManager qjm)
+ throws IOException {
+ qjm.recoverUnfinalizedSegments();
+ long lastRecoveredTxn = 0;
+
+ List<EditLogInputStream> streams = Lists.newArrayList();
+ try {
+ qjm.selectInputStreams(streams, 0, false);
+
+ for (EditLogInputStream elis : streams) {
+ assertTrue(elis.getFirstTxId() > lastRecoveredTxn);
+ lastRecoveredTxn = elis.getLastTxId();
+ }
+ } finally {
+ IOUtils.cleanup(null, streams.toArray(new Closeable[0]));
+ }
+ return lastRecoveredTxn;
+ }
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.junit.Test;
+
+
+public class TestMiniJournalCluster {
+ @Test
+ public void testStartStop() throws IOException {
+ Configuration conf = new Configuration();
+ MiniJournalCluster c = new MiniJournalCluster.Builder(conf)
+ .build();
+ try {
+ URI uri = c.getQuorumJournalURI("myjournal");
+ String[] addrs = uri.getAuthority().split(";");
+ assertEquals(3, addrs.length);
+
+ JournalNode node = c.getJournalNode(0);
+ String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY);
+ assertEquals(
+ new File(MiniDFSCluster.getBaseDirectory() + "journalnode-0")
+ .getAbsolutePath(),
+ dir);
+ } finally {
+ c.shutdown();
+ }
+ }
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.ExitUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNNWithQJM {
+ Configuration conf = new HdfsConfiguration();
+ private MiniJournalCluster mjc;
+ private Path TEST_PATH = new Path("/test-dir");
+ private Path TEST_PATH_2 = new Path("/test-dir");
+
+ @Before
+ public void resetSystemExit() {
+ ExitUtil.resetFirstExitException();
+ }
+
+ @Before
+ public void startJNs() throws Exception {
+ mjc = new MiniJournalCluster.Builder(conf).build();
+ }
+
+ @After
+ public void stopJNs() throws Exception {
+ if (mjc != null) {
+ mjc.shutdown();
+ }
+ }
+
+ @Test
+ public void testLogAndRestart() throws IOException {
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
+ conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+ mjc.getQuorumJournalURI("myjournal").toString());
+
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(0)
+ .manageNameDfsDirs(false)
+ .build();
+ try {
+ cluster.getFileSystem().mkdirs(TEST_PATH);
+
+ // Restart the NN and make sure the edit was persisted
+ // and loaded again
+ cluster.restartNameNode();
+
+ assertTrue(cluster.getFileSystem().exists(TEST_PATH));
+ cluster.getFileSystem().mkdirs(TEST_PATH_2);
+
+ // Restart the NN again and make sure both edits are persisted.
+ cluster.restartNameNode();
+ assertTrue(cluster.getFileSystem().exists(TEST_PATH));
+ assertTrue(cluster.getFileSystem().exists(TEST_PATH_2));
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testNewNamenodeTakesOverWriter() throws Exception {
+ File nn1Dir = new File(
+ MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image-nn1");
+ File nn2Dir = new File(
+ MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image-nn2");
+
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ nn1Dir.getAbsolutePath());
+ conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+ mjc.getQuorumJournalURI("myjournal").toString());
+
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(0)
+ .manageNameDfsDirs(false)
+ .checkExitOnShutdown(false)
+ .build();
+
+ try {
+ cluster.getFileSystem().mkdirs(TEST_PATH);
+
+ // Start a second NN pointed to the same quorum.
+ // We need to copy the image dir from the first NN -- or else
+ // the new NN will just be rejected because of Namespace mismatch.
+ FileUtil.fullyDelete(nn2Dir);
+ FileUtil.copy(nn1Dir, FileSystem.getLocal(conf).getRaw(),
+ new Path(nn2Dir.getAbsolutePath()), false, conf);
+
+ Configuration conf2 = new Configuration();
+ conf2.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ nn2Dir.getAbsolutePath());
+ conf2.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+ mjc.getQuorumJournalURI("myjournal").toString());
+ MiniDFSCluster cluster2 = new MiniDFSCluster.Builder(conf2)
+ .numDataNodes(0)
+ .format(false)
+ .manageNameDfsDirs(false)
+ .build();
+
+ // Check that the new cluster sees the edits made on the old cluster
+ try {
+ assertTrue(cluster2.getFileSystem().exists(TEST_PATH));
+ } finally {
+ cluster2.shutdown();
+ }
+
+ // Check that, if we try to write to the old NN
+ // that it aborts.
+ try {
+ cluster.getFileSystem().mkdirs(new Path("/x"));
+ fail("Did not abort trying to write to a fenced NN");
+ } catch (RemoteException re) {
+ GenericTestUtils.assertExceptionContains(
+ "Could not sync enough journals to persistent storage", re);
+ }
+ } finally {
+ //cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testMismatchedNNIsRejected() throws Exception {
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
+ conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+ mjc.getQuorumJournalURI("myjournal").toString());
+
+ // Start a NN, so the storage is formatted -- both on-disk
+ // and QJM.
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(0)
+ .manageNameDfsDirs(false)
+ .build();
+ cluster.shutdown();
+
+ // Reformat just the on-disk portion
+ Configuration onDiskOnly = new Configuration(conf);
+ onDiskOnly.unset(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
+ NameNode.format(onDiskOnly);
+
+ // Start the NN - should fail because the JNs are still formatted
+ // with the old namespace ID.
+ try {
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(0)
+ .manageNameDfsDirs(false)
+ .format(false)
+ .build();
+ fail("New NN with different namespace should have been rejected");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "Unable to start log segment 1: too few journals", ioe);
+ }
+ }
+
+ @Test
+ public void testWebPageHasQjmInfo() throws Exception {
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
+ conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+ mjc.getQuorumJournalURI("myjournal").toString());
+ // Speed up the test
+ conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(0)
+ .manageNameDfsDirs(false)
+ .build();
+ try {
+ URL url = new URL("http://localhost:"
+ + NameNode.getHttpAddress(cluster.getConfiguration(0)).getPort()
+ + "/dfshealth.jsp");
+
+ cluster.getFileSystem().mkdirs(TEST_PATH);
+
+ String contents = DFSTestUtil.urlGet(url);
+ assertTrue(contents.contains("QJM to ["));
+ assertTrue(contents.contains("Written txid 2"));
+
+ // Stop one JN, do another txn, and make sure it shows as behind
+ // stuck behind the others.
+ mjc.getJournalNode(0).stopAndJoin(0);
+
+ cluster.getFileSystem().delete(TEST_PATH, true);
+
+ contents = DFSTestUtil.urlGet(url);
+ System.out.println(contents);
+ assertTrue(Pattern.compile("1 txns/\\d+ms behind").matcher(contents)
+ .find());
+
+ // Restart NN while JN0 is still down.
+ cluster.restartNameNode();
+
+ contents = DFSTestUtil.urlGet(url);
+ System.out.println(contents);
+ assertTrue(Pattern.compile("never written").matcher(contents)
+ .find());
+
+
+ } finally {
+ cluster.shutdown();
+ }
+
+ }
+}