You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/08/15 02:48:12 UTC
svn commit: r1373177 - in
/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/qjournal/client/
src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/
src/main/java/org/apache/hadoop/hdfs/qjournal/...
Author: todd
Date: Wed Aug 15 00:48:11 2012
New Revision: 1373177
URL: http://svn.apache.org/viewvc?rev=1373177&view=rev
Log:
HDFS-3793. Implement genericized format() in QJM. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Wed Aug 15 00:48:11 2012
@@ -14,3 +14,5 @@ HDFS-3725. Fix QJM startup when individu
HDFS-3741. Exhaustive failure injection test for skipped RPCs (todd)
HDFS-3773. TestNNWithQJM fails after HDFS-3741. (atm)
+
+HDFS-3793. Implement genericized format() in QJM (todd)
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java Wed Aug 15 00:48:11 2012
@@ -83,6 +83,12 @@ interface AsyncLogger {
public ListenableFuture<Void> purgeLogsOlderThan(long minTxIdToKeep);
/**
+ * Format the log directory.
+ * @param nsInfo the namespace info to format with
+ */
+ public ListenableFuture<Void> format(NamespaceInfo nsInfo);
+
+ /**
* @return the state of the last epoch on the target node.
*/
public ListenableFuture<GetJournalStateResponseProto> getJournalState();
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Wed Aug 15 00:48:11 2012
@@ -25,18 +25,23 @@ import java.util.concurrent.TimeoutExcep
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.ipc.RemoteException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
/**
* Wrapper around a set of Loggers, taking care of fanning out
@@ -197,6 +202,36 @@ class AsyncLoggerSet {
}
return QuorumCall.create(calls);
}
+
+ public QuorumCall<AsyncLogger, Boolean> isFormatted() {
+ Map<AsyncLogger, ListenableFuture<Boolean>> calls = Maps.newHashMap();
+ for (AsyncLogger logger : loggers) {
+ final SettableFuture<Boolean> ret = SettableFuture.create();
+ ListenableFuture<GetJournalStateResponseProto> jstate =
+ logger.getJournalState();
+ Futures.addCallback(jstate, new FutureCallback<GetJournalStateResponseProto>() {
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof RemoteException) {
+ t = ((RemoteException)t).unwrapRemoteException();
+ }
+ if (t instanceof JournalNotFormattedException) {
+ ret.set(false);
+ } else {
+ ret.setException(t);
+ }
+ }
+
+ @Override
+ public void onSuccess(GetJournalStateResponseProto jstate) {
+ ret.set(true);
+ }
+ });
+
+ calls.put(logger, ret);
+ }
+ return QuorumCall.create(calls);
+ }
private QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
NamespaceInfo nsInfo,
@@ -275,4 +310,15 @@ class AsyncLoggerSet {
}
return QuorumCall.create(calls);
}
+
+ QuorumCall<AsyncLogger,Void> format(NamespaceInfo nsInfo) {
+ Map<AsyncLogger, ListenableFuture<Void>> calls =
+ Maps.newHashMap();
+ for (AsyncLogger logger : loggers) {
+ ListenableFuture<Void> future =
+ logger.format(nsInfo);
+ calls.put(logger, future);
+ }
+ return QuorumCall.create(calls);
+ }
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Wed Aug 15 00:48:11 2012
@@ -281,6 +281,17 @@ public class IPCLoggerChannel implements
}
@Override
+ public ListenableFuture<Void> format(final NamespaceInfo nsInfo) {
+ return executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ getProxy().format(journalId, nsInfo);
+ return null;
+ }
+ });
+ }
+
+ @Override
public ListenableFuture<Void> startLogSegment(final long txid) {
return executor.submit(new Callable<Void>() {
@Override
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Wed Aug 15 00:48:11 2012
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -68,6 +69,12 @@ public class QuorumJournalManager implem
private final int acceptRecoveryTimeoutMs;
private final int finalizeSegmentTimeoutMs;
private final int selectInputStreamsTimeoutMs;
+
+ // Since these don't occur during normal operation, we can
+ // use rather lengthy timeouts, and don't need to make them
+ // configurable.
+ private static final int FORMAT_TIMEOUT_MS = 60000;
+ private static final int HASDATA_TIMEOUT_MS = 60000;
private final Configuration conf;
private final URI uri;
@@ -133,6 +140,52 @@ public class QuorumJournalManager implem
"bad journal id: " + jid);
}
+ @Override
+ public void format(NamespaceInfo nsInfo) throws IOException {
+ QuorumCall<AsyncLogger,Void> call = loggers.format(nsInfo);
+ try {
+ call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for format() response");
+ } catch (TimeoutException e) {
+ throw new IOException("Timed out waiting for format() response");
+ }
+
+ if (call.countExceptions() > 0) {
+ call.rethrowException("Could not format one or more JournalNodes");
+ }
+ }
+
+ @Override
+ public boolean hasSomeData() throws IOException {
+ QuorumCall<AsyncLogger, Boolean> call =
+ loggers.isFormatted();
+
+ try {
+ call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while determining if JNs have data");
+ } catch (TimeoutException e) {
+ throw new IOException("Timed out waiting for response from loggers");
+ }
+
+ if (call.countExceptions() > 0) {
+ call.rethrowException(
+ "Unable to check if JNs are ready for formatting");
+ }
+
+ // If any of the loggers returned with a non-empty manifest, then
+ // we should prompt for format.
+ for (Boolean hasData : call.getResults().values()) {
+ if (hasData) {
+ return true;
+ }
+ }
+
+ // Otherwise, none were formatted, we can safely format.
+ return false;
+ }
+
/**
* Run recovery/synchronization for a specific segment.
* Postconditions:
@@ -278,7 +331,7 @@ public class QuorumJournalManager implem
}
return addrs;
}
-
+
@Override
public EditLogOutputStream startLogSegment(long txId) throws IOException {
Preconditions.checkState(isActiveWriter,
Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java?rev=1373177&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java Wed Aug 15 00:48:11 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import java.io.IOException;
+
+/**
+ * Exception indicating that a call has been made to a JournalNode
+ * which is not yet formatted.
+ */
+@InterfaceAudience.Private
+public class JournalNotFormattedException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public JournalNotFormattedException(String msg) {
+ super(msg);
+ }
+
+}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java Wed Aug 15 00:48:11 2012
@@ -55,6 +55,12 @@ public interface QJournalProtocol {
throws IOException;
/**
+ * Format the underlying storage for the given namespace.
+ */
+ public void format(String journalId,
+ NamespaceInfo nsInfo) throws IOException;
+
+ /**
* Begin a new epoch. See the HDFS-3077 design doc for details.
*/
public NewEpochResponseProto newEpoch(String journalId,
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Wed Aug 15 00:48:11 2012
@@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.qjournal.p
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
@@ -90,6 +92,17 @@ public class QJournalProtocolServerSideT
}
}
+ public FormatResponseProto format(RpcController controller,
+ FormatRequestProto request) throws ServiceException {
+ try {
+ impl.format(request.getJid().getIdentifier(),
+ PBHelper.convert(request.getNsInfo()));
+ return FormatResponseProto.getDefaultInstance();
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
/** @see JournalProtocol#journal */
@Override
public JournalResponseProto journal(RpcController unused,
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Wed Aug 15 00:48:11 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocolPB
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
@@ -93,6 +94,19 @@ public class QJournalProtocolTranslatorP
.setIdentifier(jid)
.build();
}
+
+ @Override
+ public void format(String jid, NamespaceInfo nsInfo) throws IOException {
+ try {
+ FormatRequestProto req = FormatRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setNsInfo(PBHelper.convert(nsInfo))
+ .build();
+ rpcProxy.format(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
@Override
public NewEpochResponseProto newEpoch(String jid, NamespaceInfo nsInfo,
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java Wed Aug 15 00:48:11 2012
@@ -105,37 +105,28 @@ class JNStorage extends Storage {
setStorageInfo(nsInfo);
LOG.info("Formatting journal storage directory " +
sd + " with nsid: " + getNamespaceID());
+ // Unlock the directory before formatting, because we will
+ // re-analyze it after format(). The analyzeStorage() call
+ // below is reponsible for re-locking it. This is a no-op
+ // if the storage is not currently locked.
+ unlockAll();
sd.clearDirectory();
writeProperties(sd);
if (!getPaxosDir().mkdirs()) {
throw new IOException("Could not create paxos dir: " + getPaxosDir());
}
- }
-
- public void formatIfNecessary(NamespaceInfo nsInfo) throws IOException {
- if (state == StorageState.NOT_FORMATTED ||
- state == StorageState.NON_EXISTENT) {
- format(nsInfo);
- analyzeStorage();
- assert state == StorageState.NORMAL :
- "Unexpected state after formatting: " + state;
- } else {
- Preconditions.checkState(state == StorageState.NORMAL,
- "Unhandled storage state in %s: %s", this, state);
- assert getNamespaceID() != 0;
-
- checkConsistentNamespace(nsInfo);
- }
+ analyzeStorage();
}
- private void analyzeStorage() throws IOException {
+
+ void analyzeStorage() throws IOException {
this.state = sd.analyzeStorage(StartupOption.REGULAR, this);
if (state == StorageState.NORMAL) {
readProperties(sd);
}
}
- private void checkConsistentNamespace(NamespaceInfo nsInfo)
+ void checkConsistentNamespace(NamespaceInfo nsInfo)
throws IOException {
if (nsInfo.getNamespaceID() != getNamespaceID()) {
throw new IOException("Incompatible namespaceID for journal " +
@@ -155,4 +146,8 @@ class JNStorage extends Storage {
LOG.info("Closing journal storage for " + sd);
unlockAll();
}
+
+ public boolean isFormatted() {
+ return state == StorageState.NORMAL;
+ }
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Wed Aug 15 00:48:11 2012
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
@@ -114,6 +115,8 @@ class Journal implements Closeable {
Preconditions.checkState(nsInfo.getNamespaceID() != 0,
"can't format with uninitialized namespace info: %s",
nsInfo);
+ LOG.info("Formatting " + this + " with namespace info: " +
+ nsInfo);
storage.format(nsInfo);
}
@@ -134,6 +137,7 @@ class Journal implements Closeable {
* any lower epoch, or 0 if no promises have been made.
*/
synchronized long getLastPromisedEpoch() throws IOException {
+ checkFormatted();
return lastPromisedEpoch.get();
}
@@ -150,9 +154,8 @@ class Journal implements Closeable {
synchronized NewEpochResponseProto newEpoch(
NamespaceInfo nsInfo, long epoch) throws IOException {
- // If the storage is unformatted, format it with this NS.
- // Otherwise, check that the NN's nsinfo matches the storage.
- storage.formatIfNecessary(nsInfo);
+ checkFormatted();
+ storage.checkConsistentNamespace(nsInfo);
if (epoch <= getLastPromisedEpoch()) {
throw new IOException("Proposed epoch " + epoch + " <= last promise " +
@@ -185,6 +188,7 @@ class Journal implements Closeable {
synchronized void journal(RequestInfo reqInfo, long firstTxnId,
int numTxns, byte[] records) throws IOException {
checkRequest(reqInfo);
+ checkFormatted();
// TODO: if a JN goes down and comes back up, then it will throw
// this exception on every edit. We should instead send back
@@ -226,6 +230,13 @@ class Journal implements Closeable {
// TODO: some check on serial number that they only increase from a given
// client
}
+
+ private void checkFormatted() throws JournalNotFormattedException {
+ if (!storage.isFormatted()) {
+ throw new JournalNotFormattedException("Journal " + storage +
+ " not formatted");
+ }
+ }
/**
* Start a new segment at the given txid. The previous segment
@@ -235,6 +246,7 @@ class Journal implements Closeable {
throws IOException {
assert fjm != null;
checkRequest(reqInfo);
+ checkFormatted();
Preconditions.checkState(curSegment == null,
"Can't start a log segment, already writing " + curSegment);
@@ -251,6 +263,7 @@ class Journal implements Closeable {
public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
long endTxId) throws IOException {
checkRequest(reqInfo);
+ checkFormatted();
if (startTxId == curSegmentTxId) {
if (curSegment != null) {
@@ -284,6 +297,7 @@ class Journal implements Closeable {
public synchronized void purgeLogsOlderThan(RequestInfo reqInfo,
long minTxIdToKeep) throws IOException {
checkRequest(reqInfo);
+ checkFormatted();
fjm.purgeLogsOlderThan(minTxIdToKeep);
purgePaxosDecisionsOlderThan(minTxIdToKeep);
@@ -320,6 +334,8 @@ class Journal implements Closeable {
throws IOException {
// No need to checkRequest() here - anyone may ask for the list
// of segments.
+ checkFormatted();
+
RemoteEditLogManifest manifest = new RemoteEditLogManifest(
fjm.getRemoteEditLogs(sinceTxId));
return manifest;
@@ -360,6 +376,7 @@ class Journal implements Closeable {
public synchronized PrepareRecoveryResponseProto prepareRecovery(
RequestInfo reqInfo, long segmentTxId) throws IOException {
checkRequest(reqInfo);
+ checkFormatted();
PrepareRecoveryResponseProto.Builder builder =
PrepareRecoveryResponseProto.newBuilder();
@@ -388,6 +405,7 @@ class Journal implements Closeable {
SegmentStateProto segment, URL fromUrl)
throws IOException {
checkRequest(reqInfo);
+ checkFormatted();
long segmentTxId = segment.getStartTxId();
// TODO: right now, a recovery of a segment when the log is
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Wed Aug 15 00:48:11 2012
@@ -108,6 +108,11 @@ class JournalNodeRpcServer implements QJ
return jn.getOrCreateJournal(journalId).newEpoch(nsInfo, epoch);
}
+ @Override
+ public void format(String journalId, NamespaceInfo nsInfo)
+ throws IOException {
+ jn.getOrCreateJournal(journalId).format(nsInfo);
+ }
@Override
public void journal(RequestInfo reqInfo, long firstTxnId,
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto Wed Aug 15 00:48:11 2012
@@ -110,6 +110,17 @@ message GetJournalStateResponseProto {
}
/**
+ * format()
+ */
+message FormatRequestProto {
+ required JournalIdProto jid = 1;
+ required NamespaceInfoProto nsInfo = 2;
+}
+
+message FormatResponseProto {
+}
+
+/**
* newEpoch()
*/
message NewEpochRequestProto {
@@ -178,6 +189,8 @@ service QJournalProtocolService {
rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto);
+ rpc format(FormatRequestProto) returns (FormatResponseProto);
+
rpc journal(JournalRequestProto) returns (JournalResponseProto);
rpc startLogSegment(StartLogSegmentRequestProto)
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Wed Aug 15 00:48:11 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ExitUtil;
@@ -157,19 +158,26 @@ public class TestNNWithQJM {
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
mjc.getQuorumJournalURI("myjournal").toString());
- // Start a NN, so the storage is formatted with its namespace info.
+ // Start a NN, so the storage is formatted -- both on-disk
+ // and QJM.
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.manageNameDfsDirs(false)
.build();
cluster.shutdown();
- // Create a new (freshly-formatted) NN, which should not be able to
- // reuse the same journal, since its journal ID would not match.
+ // Reformat just the on-disk portion
+ Configuration onDiskOnly = new Configuration(conf);
+ onDiskOnly.unset(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
+ NameNode.format(onDiskOnly);
+
+ // Start the NN - should fail because the JNs are still formatted
+ // with the old namespace ID.
try {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.manageNameDfsDirs(false)
+ .format(false)
.build();
fail("New NN with different namespace should have been rejected");
} catch (IOException ioe) {
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java Wed Aug 15 00:48:11 2012
@@ -54,6 +54,10 @@ public class TestEpochsAreUnique {
Configuration conf = new Configuration();
MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
URI uri = cluster.getQuorumJournalURI(JID);
+ QuorumJournalManager qjm = new QuorumJournalManager(
+ conf, uri, FAKE_NSINFO);
+ qjm.format(FAKE_NSINFO);
+
try {
// With no failures or contention, epochs should increase one-by-one
for (int i = 0; i < 5; i++) {
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java Wed Aug 15 00:48:11 2012
@@ -76,6 +76,7 @@ public class TestQJMWithFaults {
MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
try {
QuorumJournalManager qjm = createInjectableQJM(cluster);
+ qjm.format(FAKE_NSINFO);
doWorkload(cluster, qjm);
SortedSet<Integer> ipcCounts = Sets.newTreeSet();
@@ -118,6 +119,7 @@ public class TestQJMWithFaults {
try {
QuorumJournalManager qjm;
qjm = createInjectableQJM(cluster);
+ qjm.format(FAKE_NSINFO);
List<AsyncLogger> loggers = qjm.getLoggerSetForTests().getLoggersForTests();
failIpcNumber(loggers.get(0), failA);
failIpcNumber(loggers.get(1), failB);
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java Wed Aug 15 00:48:11 2012
@@ -84,6 +84,7 @@ public class TestQuorumJournalManager {
qjm = createSpyingQJM();
spies = qjm.getLoggerSetForTests().getLoggersForTests();
+ qjm.format(QJMTestUtil.FAKE_NSINFO);
qjm.recoverUnfinalizedSegments();
assertEquals(1, qjm.getLoggerSetForTests().getEpoch());
}
@@ -110,6 +111,15 @@ public class TestQuorumJournalManager {
}
@Test
+ public void testFormat() throws Exception {
+ QuorumJournalManager qjm = new QuorumJournalManager(
+ conf, cluster.getQuorumJournalURI("testFormat-jid"), FAKE_NSINFO);
+ assertFalse(qjm.hasSomeData());
+ qjm.format(FAKE_NSINFO);
+ assertTrue(qjm.hasSomeData());
+ }
+
+ @Test
public void testReaderWhileAnotherWrites() throws Exception {
QuorumJournalManager readerQjm = createSpyingQJM();
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java Wed Aug 15 00:48:11 2012
@@ -85,6 +85,8 @@ public class TestQuorumJournalManagerUni
futureReturns(
NewEpochResponseProto.newBuilder().build()
).when(logger).newEpoch(Mockito.anyLong());
+
+ futureReturns(null).when(logger).format(Mockito.<NamespaceInfo>any());
}
qjm.recoverUnfinalizedSegments();
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java Wed Aug 15 00:48:11 2012
@@ -61,6 +61,7 @@ public class TestJournal {
public void setup() throws Exception {
FileUtil.fullyDelete(TEST_LOG_DIR);
journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
+ journal.format(FAKE_NSINFO);
}
@After
@@ -130,18 +131,14 @@ public class TestJournal {
@Test
public void testJournalLocking() throws Exception {
+ Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported());
StorageDirectory sd = journal.getStorage().getStorageDir(0);
File lockFile = new File(sd.getRoot(), Storage.STORAGE_FILE_LOCK);
-
- // Journal should not be locked, since we lazily initialize it.
- assertFalse(lockFile.exists());
-
- journal.newEpoch(FAKE_NSINFO, 1);
- Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported());
- // Journal should be locked
+ // Journal should be locked, since the format() call locks it.
GenericTestUtils.assertExists(lockFile);
-
+
+ journal.newEpoch(FAKE_NSINFO, 1);
try {
new Journal(TEST_LOG_DIR, mockErrorReporter);
fail("Did not fail to create another journal in same dir");
@@ -153,6 +150,7 @@ public class TestJournal {
journal.close();
// Journal should no longer be locked after the close() call.
+ // Hence, should be able to create a new Journal in the same dir.
Journal journal2 = new Journal(TEST_LOG_DIR, mockErrorReporter);
journal2.newEpoch(FAKE_NSINFO, 2);
}