You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/07/20 02:25:52 UTC
svn commit: r1363596 [2/3] - in
/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./
dev-support/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocolPB/
src/main/java/org/apache/hadoop/hdfs...
Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+import java.io.IOException;
+import java.net.URL;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link JournalProtocolPB} to the
+ * {@link JournalProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolPB {
+ /** Server side implementation to delegate the requests to */
+ private final QJournalProtocol impl;
+
+ public QJournalProtocolServerSideTranslatorPB(QJournalProtocol impl) {
+ this.impl = impl;
+ }
+
+ @Override
+ public GetJournalStateResponseProto getJournalState(RpcController controller,
+ GetJournalStateRequestProto request) throws ServiceException {
+ try {
+ return impl.getJournalState(
+ convert(request.getJid()));
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
+ private String convert(JournalIdProto jid) {
+ return jid.getIdentifier();
+ }
+
+ @Override
+ public NewEpochResponseProto newEpoch(RpcController controller,
+ NewEpochRequestProto request) throws ServiceException {
+ try {
+ return impl.newEpoch(
+ request.getJid().getIdentifier(),
+ PBHelper.convert(request.getNsInfo()),
+ request.getEpoch());
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
+ /** @see JournalProtocol#journal */
+ @Override
+ public JournalResponseProto journal(RpcController unused,
+ JournalRequestProto req) throws ServiceException {
+ try {
+ impl.journal(convert(req.getReqInfo()),
+ req.getFirstTxnId(), req.getNumTxns(), req.getRecords()
+ .toByteArray());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return JournalResponseProto.newBuilder().build();
+ }
+
+ /** @see JournalProtocol#startLogSegment */
+ @Override
+ public StartLogSegmentResponseProto startLogSegment(RpcController controller,
+ StartLogSegmentRequestProto req) throws ServiceException {
+ try {
+ impl.startLogSegment(convert(req.getReqInfo()),
+ req.getTxid());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return StartLogSegmentResponseProto.newBuilder().build();
+ }
+
+ @Override
+ public FinalizeLogSegmentResponseProto finalizeLogSegment(
+ RpcController controller, FinalizeLogSegmentRequestProto req)
+ throws ServiceException {
+ try {
+ impl.finalizeLogSegment(convert(req.getReqInfo()),
+ req.getStartTxId(), req.getEndTxId());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return FinalizeLogSegmentResponseProto.newBuilder().build();
+ }
+
+ @Override
+ public GetEditLogManifestResponseProto getEditLogManifest(
+ RpcController controller, GetEditLogManifestRequestProto request)
+ throws ServiceException {
+ try {
+ return impl.getEditLogManifest(
+ request.getJid().getIdentifier(),
+ request.getSinceTxId());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+
+ @Override
+ public PrepareRecoveryResponseProto prepareRecovery(RpcController controller,
+ PrepareRecoveryRequestProto request) throws ServiceException {
+ try {
+ return impl.prepareRecovery(convert(request.getReqInfo()),
+ request.getSegmentTxId());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public AcceptRecoveryResponseProto acceptRecovery(RpcController controller,
+ AcceptRecoveryRequestProto request) throws ServiceException {
+ try {
+ impl.acceptRecovery(convert(request.getReqInfo()),
+ request.getStateToAccept(),
+ new URL(request.getFromURL()));
+ return AcceptRecoveryResponseProto.getDefaultInstance();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+
+ private RequestInfo convert(
+ QJournalProtocolProtos.RequestInfoProto reqInfo) {
+ return new RequestInfo(
+ reqInfo.getJournalId().getIdentifier(),
+ reqInfo.getEpoch(),
+ reqInfo.getIpcSerialNumber());
+ }
+}
Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link JournalProtocol} interfaces to the RPC server implementing
+ * {@link JournalProtocolPB}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
+ QJournalProtocol, Closeable {
+ /** RpcController is not used and hence is set to null */
+ private final static RpcController NULL_CONTROLLER = null;
+ private final QJournalProtocolPB rpcProxy;
+
+ public QJournalProtocolTranslatorPB(QJournalProtocolPB rpcProxy) {
+ this.rpcProxy = rpcProxy;
+ }
+
+ @Override
+ public void close() {
+ RPC.stopProxy(rpcProxy);
+ }
+
+
+ @Override
+ public GetJournalStateResponseProto getJournalState(String jid)
+ throws IOException {
+ try {
+ GetJournalStateRequestProto req = GetJournalStateRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .build();
+ return rpcProxy.getJournalState(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ private JournalIdProto convertJournalId(String jid) {
+ return JournalIdProto.newBuilder()
+ .setIdentifier(jid)
+ .build();
+ }
+
+ @Override
+ public NewEpochResponseProto newEpoch(String jid, NamespaceInfo nsInfo,
+ long epoch) throws IOException {
+ try {
+ NewEpochRequestProto req = NewEpochRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setNsInfo(PBHelper.convert(nsInfo))
+ .setEpoch(epoch)
+ .build();
+ return rpcProxy.newEpoch(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void journal(RequestInfo reqInfo, long firstTxnId, int numTxns,
+ byte[] records) throws IOException {
+ JournalRequestProto req = JournalRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .setFirstTxnId(firstTxnId)
+ .setNumTxns(numTxns)
+ .setRecords(PBHelper.getByteString(records))
+ .build();
+ try {
+ rpcProxy.journal(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ private QJournalProtocolProtos.RequestInfoProto convert(
+ RequestInfo reqInfo) {
+ return QJournalProtocolProtos.RequestInfoProto.newBuilder()
+ .setJournalId(convertJournalId(reqInfo.getJournalId()))
+ .setEpoch(reqInfo.getEpoch())
+ .setIpcSerialNumber(reqInfo.getIpcSerialNumber())
+ .build();
+ }
+
+ @Override
+ public void startLogSegment(RequestInfo reqInfo, long txid)
+ throws IOException {
+ StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .setTxid(txid)
+ .build();
+ try {
+ rpcProxy.startLogSegment(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
+ long endTxId) throws IOException {
+ FinalizeLogSegmentRequestProto req =
+ FinalizeLogSegmentRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .setStartTxId(startTxId)
+ .setEndTxId(endTxId)
+ .build();
+ try {
+ rpcProxy.finalizeLogSegment(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public GetEditLogManifestResponseProto getEditLogManifest(String jid,
+ long sinceTxId) throws IOException {
+ try {
+ return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
+ GetEditLogManifestRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setSinceTxId(sinceTxId)
+ .build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
+ long segmentTxId) throws IOException {
+ try {
+ return rpcProxy.prepareRecovery(NULL_CONTROLLER,
+ PrepareRecoveryRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .setSegmentTxId(segmentTxId)
+ .build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void acceptRecovery(RequestInfo reqInfo,
+ SegmentStateProto stateToAccept, URL fromUrl) throws IOException {
+ try {
+ rpcProxy.acceptRecovery(NULL_CONTROLLER,
+ AcceptRecoveryRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .setStateToAccept(stateToAccept)
+ .setFromURL(fromUrl.toExternalForm())
+ .build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ public boolean isMethodSupported(String methodName) throws IOException {
+ return RpcClientUtil.isMethodSupported(rpcProxy,
+ QJournalProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(QJournalProtocolPB.class), methodName);
+ }
+}
Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ServletUtil;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This servlet is used in two cases:
+ * <ul>
+ * <li>The QuorumJournalManager, when reading edits, fetches the edit streams
+ * from the journal nodes.</li>
+ * <li>During edits synchronization, one journal node will fetch edits from
+ * another journal node.</li>
+ * </ul>
+ */
+@InterfaceAudience.Private
+public class GetJournalEditServlet extends HttpServlet {
+
+ private static final long serialVersionUID = -4635891628211723009L;
+ private static final Log LOG = LogFactory.getLog(GetJournalEditServlet.class);
+
+ static final String STORAGEINFO_PARAM = "storageInfo";
+ static final String JOURNAL_ID_PARAM = "jid";
+ static final String SEGMENT_TXID_PARAM = "segmentTxId";
+
+ // TODO: create security tests
+ protected boolean isValidRequestor(String remoteUser, Configuration conf)
+ throws IOException {
+ if (remoteUser == null) { // This really shouldn't happen...
+ LOG.warn("Received null remoteUser while authorizing access to " +
+ "GetJournalEditServlet");
+ return false;
+ }
+
+ String[] validRequestors = {
+ SecurityUtil.getServerPrincipal(conf
+ .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode
+ .getAddress(conf).getHostName()),
+ SecurityUtil.getServerPrincipal(conf
+ .get(DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY),
+ NameNode.getAddress(conf).getHostName()) };
+ // TODO: above principal is not correct, since each JN will have a
+ // different hostname.
+
+ for (String v : validRequestors) {
+ if (v != null && v.equals(remoteUser)) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("isValidRequestor is allowing: " + remoteUser);
+ return true;
+ }
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("isValidRequestor is rejecting: " + remoteUser);
+ return false;
+ }
+
+ private boolean checkRequestorOrSendError(Configuration conf,
+ HttpServletRequest request, HttpServletResponse response)
+ throws IOException {
+ if (UserGroupInformation.isSecurityEnabled()
+ && !isValidRequestor(request.getRemoteUser(), conf)) {
+ response.sendError(HttpServletResponse.SC_FORBIDDEN,
+ "Only Namenode and another JournalNode may access this servlet");
+ LOG.warn("Received non-NN/JN request for edits from "
+ + request.getRemoteHost());
+ return false;
+ }
+ return true;
+ }
+
+ private boolean checkStorageInfoOrSendError(JNStorage storage,
+ HttpServletRequest request, HttpServletResponse response)
+ throws IOException {
+ String myStorageInfoString = storage.toColonSeparatedString();
+ String theirStorageInfoString = request.getParameter(STORAGEINFO_PARAM);
+
+ if (theirStorageInfoString != null
+ && !myStorageInfoString.equals(theirStorageInfoString)) {
+ response.sendError(HttpServletResponse.SC_FORBIDDEN,
+ "This node has storage info " + myStorageInfoString
+ + " but the requesting node expected "
+ + theirStorageInfoString);
+ LOG.warn("Received an invalid request file transfer request "
+ + " with storage info " + theirStorageInfoString);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void doGet(final HttpServletRequest request,
+ final HttpServletResponse response) throws ServletException, IOException {
+ try {
+ final ServletContext context = getServletContext();
+ final Configuration conf = (Configuration) getServletContext()
+ .getAttribute(JspHelper.CURRENT_CONF);
+ final String journalId = request.getParameter(JOURNAL_ID_PARAM);
+ QuorumJournalManager.checkJournalId(journalId);
+ final JNStorage storage = JournalNodeHttpServer
+ .getJournalFromContext(context, journalId).getStorage();
+
+ // Check security
+ if (!checkRequestorOrSendError(conf, request, response)) {
+ return;
+ }
+
+ // Check that the namespace info is correct
+ if (!checkStorageInfoOrSendError(storage, request, response)) {
+ return;
+ }
+
+ long segmentTxId = ServletUtil.parseLongParam(request,
+ SEGMENT_TXID_PARAM);
+
+ FileJournalManager fjm = storage.getJournalManager();
+ File editFile;
+ FileInputStream editFileIn;
+
+ synchronized (fjm) {
+ // Synchronize on the FJM so that the file doesn't get finalized
+ // out from underneath us while we're in the process of opening
+ // it up.
+ EditLogFile elf = fjm.getLogFile(
+ segmentTxId);
+ if (elf == null) {
+ response.sendError(HttpServletResponse.SC_NOT_FOUND,
+ "No edit log found starting at txid " + segmentTxId);
+ return;
+ }
+ editFile = elf.getFile();
+ GetImageServlet.setVerificationHeaders(response, editFile);
+ GetImageServlet.setFileNameHeaders(response, editFile);
+ editFileIn = new FileInputStream(editFile);
+ }
+
+ DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);
+
+ // send edits
+ TransferFsImage.getFileServer(response, editFile, editFileIn, throttler);
+
+ } catch (Throwable t) {
+ String errMsg = "getedit failed. " + StringUtils.stringifyException(t);
+ response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, errMsg);
+ throw new IOException(errMsg);
+ }
+ }
+
+ public static String buildPath(String journalId, long segmentTxId,
+ NamespaceInfo nsInfo) {
+ StringBuilder path = new StringBuilder("/getJournal?");
+ try {
+ path.append(JOURNAL_ID_PARAM).append("=")
+ .append(URLEncoder.encode(journalId, "UTF-8"));
+ path.append("&" + SEGMENT_TXID_PARAM).append("=")
+ .append(segmentTxId);
+ path.append("&" + STORAGEINFO_PARAM).append("=")
+ .append(URLEncoder.encode(nsInfo.toColonSeparatedString(), "UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ // Never get here -- everyone supports UTF-8
+ throw new RuntimeException(e);
+ }
+ return path.toString();
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+/**
+ * A {@link Storage} implementation for the {@link JournalNode}.
+ *
+ * The JN has a storage directory for each namespace for which it stores
+ * metadata. There is only a single directory per JN in the current design.
+ */
+class JNStorage extends Storage {
+
+ private final FileJournalManager fjm;
+ private final StorageDirectory sd;
+ private boolean lazyInitted = false;
+
+ /**
+ * @param logDir the path to the directory in which data will be stored
+ * @param errorReporter a callback to report errors
+ */
+ protected JNStorage(File logDir, StorageErrorReporter errorReporter) {
+ super(NodeType.JOURNAL_NODE);
+
+ sd = new StorageDirectory(logDir);
+ this.addStorageDir(sd);
+ this.fjm = new FileJournalManager(sd, errorReporter);
+ }
+
+ FileJournalManager getJournalManager() {
+ return fjm;
+ }
+
+ @Override
+ public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
+ return false;
+ }
+
+ /**
+ * Find an edits file spanning the given transaction ID range.
+ * If no such file exists, an exception is thrown.
+ */
+ File findFinalizedEditsFile(long startTxId, long endTxId) throws IOException {
+ File ret = new File(sd.getCurrentDir(),
+ NNStorage.getFinalizedEditsFileName(startTxId, endTxId));
+ if (!ret.exists()) {
+ throw new IOException(
+ "No edits file for range " + startTxId + "-" + endTxId);
+ }
+ return ret;
+ }
+
+ /**
+ * @return the path for an in-progress edits file starting at the given
+ * transaction ID. This does not verify existence of the file.
+ */
+ File getInProgressEditLog(long startTxId) {
+ return new File(sd.getCurrentDir(),
+ NNStorage.getInProgressEditsFileName(startTxId));
+ }
+
+ /**
+ * @return the path for the file which contains persisted data for the
+ * paxos-like recovery process for the given log segment.
+ */
+ File getPaxosFile(long segmentTxId) {
+ return new File(getPaxosDir(), String.valueOf(segmentTxId));
+ }
+
+ private File getPaxosDir() {
+ return new File(sd.getCurrentDir(), "paxos");
+ }
+
+ void format(NamespaceInfo nsInfo) throws IOException {
+ setStorageInfo(nsInfo);
+ LOG.info("Formatting journal storage directory " +
+ sd + " with nsid: " + getNamespaceID());
+ sd.clearDirectory();
+ writeProperties(sd);
+ if (!getPaxosDir().mkdirs()) {
+ throw new IOException("Could not create paxos dir: " + getPaxosDir());
+ }
+ }
+
+ void analyzeStorage(NamespaceInfo nsInfo) throws IOException {
+ if (lazyInitted) {
+ checkConsistentNamespace(nsInfo);
+ return;
+ }
+
+ StorageState state = sd.analyzeStorage(StartupOption.REGULAR, this);
+ switch (state) {
+ case NON_EXISTENT:
+ case NOT_FORMATTED:
+ format(nsInfo);
+ // In the NORMAL case below, analyzeStorage() has already locked the
+ // directory for us. But in the case that we format it, we have to
+ // lock it here.
+ // The directory is unlocked in close() when the node shuts down.
+ sd.lock();
+ break;
+ case NORMAL:
+ // Storage directory is already locked by analyzeStorage() - no
+ // need to lock it here.
+ readProperties(sd);
+ checkConsistentNamespace(nsInfo);
+ break;
+
+ default:
+ LOG.warn("TODO: unhandled state for storage dir " + sd + ": " + state);
+ }
+ lazyInitted = true;
+ }
+
+ private void checkConsistentNamespace(NamespaceInfo nsInfo)
+ throws IOException {
+ if (nsInfo.getNamespaceID() != getNamespaceID()) {
+ throw new IOException("Incompatible namespaceID for journal " +
+ this.sd + ": NameNode has nsId " + nsInfo.getNamespaceID() +
+ " but storage has nsId " + getNamespaceID());
+ }
+
+ if (!nsInfo.getClusterID().equals(getClusterID())) {
+ throw new IOException("Incompatible clusterID for journal " +
+ this.sd + ": NameNode has clusterId '" + nsInfo.getClusterID() +
+ "' but storage has clusterId '" + getClusterID() + "'");
+
+ }
+ }
+
+ public void close() throws IOException {
+ LOG.info("Closing journal storage for " + sd);
+ unlockAll();
+ }
+}
Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.net.URL;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
+import org.apache.hadoop.hdfs.util.PersistentLongFile;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.TextFormat;
+
+/**
+ * A JournalNode can manage journals for several clusters at once.
+ * Each such journal is entirely independent despite being hosted by
+ * the same JVM.
+ */
+class Journal implements Closeable {
+ static final Log LOG = LogFactory.getLog(Journal.class);
+
+
+ // Current writing state
+ private EditLogOutputStream curSegment;
+ private long curSegmentTxId = HdfsConstants.INVALID_TXID;
+ private long nextTxId = HdfsConstants.INVALID_TXID;
+
+ private final JNStorage storage;
+
+ /**
+ * When a new writer comes along, it asks each node to promise
+ * to ignore requests from any previous writer, as identified
+ * by epoch number. In order to make such a promise, the epoch
+ * number of that writer is stored persistently on disk.
+ */
+ private PersistentLongFile lastPromisedEpoch;
+ private static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
+
+ private final FileJournalManager fjm;
+
+ Journal(File logDir, StorageErrorReporter errorReporter) {
+ storage = new JNStorage(logDir, errorReporter);
+
+ File currentDir = storage.getSingularStorageDir().getCurrentDir();
+ this.lastPromisedEpoch = new PersistentLongFile(
+ new File(currentDir, LAST_PROMISED_FILENAME), 0);
+
+ this.fjm = storage.getJournalManager();
+ }
+
+ /**
+ * Iterate over the edit logs stored locally, and set
+ * {@link #curSegmentTxId} to refer to the most recently written
+ * one.
+ */
+ private synchronized void scanStorage() throws IOException {
+ if (!fjm.getStorageDirectory().getCurrentDir().exists()) {
+ return;
+ }
+ LOG.info("Scanning storage " + fjm);
+ List<EditLogFile> files = fjm.getLogFiles(0);
+ if (!files.isEmpty()) {
+ EditLogFile latestLog = files.get(files.size() - 1);
+ LOG.info("Latest log is " + latestLog);
+ curSegmentTxId = latestLog.getFirstTxId();
+ }
+ }
+
+ /**
+ * Format the local storage with the given namespace.
+ */
+ void format(NamespaceInfo nsInfo) throws IOException {
+ Preconditions.checkState(nsInfo.getNamespaceID() != 0,
+ "can't format with uninitialized namespace info: %s",
+ nsInfo);
+ storage.format(nsInfo);
+ }
+
+ /**
+ * Unlock and release resources.
+ */
+ @Override // Closeable
+ public void close() throws IOException {
+ storage.close();
+ }
+
+ JNStorage getStorage() {
+ return storage;
+ }
+
+ /**
+ * @return the last epoch which this node has promised not to accept
+ * any lower epoch, or 0 if no promises have been made.
+ */
+ synchronized long getLastPromisedEpoch() throws IOException {
+ return lastPromisedEpoch.get();
+ }
+
+ /**
+ * Try to create a new epoch for this journal.
+ * @param nsInfo the namespace, which is verified for consistency or used to
+ * format, if the Journal has not yet been written to.
+ * @param epoch the epoch to start
+ * @return the status information necessary to begin recovery
+ * @throws IOException if the node has already made a promise to another
+ * writer with a higher epoch number, if the namespace is inconsistent,
+ * or if a disk error occurs.
+ */
+ synchronized NewEpochResponseProto newEpoch(
+ NamespaceInfo nsInfo, long epoch) throws IOException {
+
+ // If the storage is unformatted, format it with this NS.
+ // Otherwise, check that the NN's nsinfo matches the storage.
+ storage.analyzeStorage(nsInfo);
+
+ if (epoch <= getLastPromisedEpoch()) {
+ throw new IOException("Proposed epoch " + epoch + " <= last promise " +
+ getLastPromisedEpoch());
+ }
+
+ lastPromisedEpoch.set(epoch);
+ if (curSegment != null) {
+ curSegment.close();
+ curSegment = null;
+ }
+
+ NewEpochResponseProto.Builder builder =
+ NewEpochResponseProto.newBuilder();
+
+ // TODO: we only need to do this once, not on writer switchover.
+ scanStorage();
+
+ if (curSegmentTxId != HdfsConstants.INVALID_TXID) {
+ builder.setLastSegmentTxId(curSegmentTxId);
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Write a batch of edits to the journal.
+ * {@see QJournalProtocol#journal(RequestInfo, long, int, byte[])}
+ */
+ synchronized void journal(RequestInfo reqInfo, long firstTxnId,
+ int numTxns, byte[] records) throws IOException {
+ checkRequest(reqInfo);
+
+ // TODO: if a JN goes down and comes back up, then it will throw
+ // this exception on every edit. We should instead send back
+ // a response indicating the log needs to be rolled, which would
+ // mark the logger on the client side as "pending" -- and have the
+ // NN code look for this condition and trigger a roll when it happens.
+ // That way the node can catch back up and rejoin
+ Preconditions.checkState(curSegment != null,
+ "Can't write, no segment open");
+ Preconditions.checkState(nextTxId == firstTxnId,
+ "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writing txid " + firstTxnId + "-" + (firstTxnId + numTxns - 1));
+ }
+ curSegment.writeRaw(records, 0, records.length);
+ curSegment.setReadyToFlush();
+ curSegment.flush();
+ nextTxId += numTxns;
+ }
+
+ /**
+ * Ensure that the given request is coming from the correct writer and in-order.
+ * @param reqInfo the request info
+ * @throws IOException if the request is invalid.
+ */
+ private synchronized void checkRequest(RequestInfo reqInfo) throws IOException {
+ // Invariant 25 from ZAB paper
+ if (reqInfo.getEpoch() < lastPromisedEpoch.get()) {
+ throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
+ " is less than the last promised epoch " +
+ lastPromisedEpoch.get());
+ }
+
+ // TODO: should other requests check the _exact_ epoch instead of
+ // the <= check? <= should probably only be necessary for the
+ // first calls
+
+ // TODO: some check on serial number that they only increase from a given
+ // client
+ }
+
+ /**
+ * Start a new segment at the given txid. The previous segment
+ * must have already been finalized.
+ */
+ public synchronized void startLogSegment(RequestInfo reqInfo, long txid)
+ throws IOException {
+ assert fjm != null;
+ checkRequest(reqInfo);
+
+ Preconditions.checkState(curSegment == null,
+ "Can't start a log segment, already writing " + curSegment);
+ Preconditions.checkState(nextTxId == txid || nextTxId == HdfsConstants.INVALID_TXID,
+ "Can't start log segment " + txid + " expecting nextTxId=" + nextTxId);
+ curSegment = fjm.startLogSegment(txid);
+ curSegmentTxId = txid;
+ nextTxId = txid;
+ }
+
+ /**
+ * Finalize the log segment at the given transaction ID.
+ */
+ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
+ long endTxId) throws IOException {
+ checkRequest(reqInfo);
+
+ if (startTxId == curSegmentTxId) {
+ if (curSegment != null) {
+ curSegment.close();
+ curSegment = null;
+ }
+ }
+
+ FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId);
+ if (elf.isInProgress()) {
+ // TODO: this is slow to validate when in non-recovery cases
+ // we already know the length here!
+
+ LOG.info("Validating log about to be finalized: " + elf);
+ elf.validateLog();
+
+ Preconditions.checkState(elf.getLastTxId() == endTxId,
+ "Trying to finalize log %s-%s, but current state of log" +
+ "is %s", startTxId, endTxId, elf);
+ fjm.finalizeLogSegment(startTxId, endTxId);
+ } else {
+ Preconditions.checkArgument(endTxId == elf.getLastTxId(),
+ "Trying to re-finalize already finalized log " +
+ elf + " with different endTxId " + endTxId);
+ }
+ }
+
+ /**
+ * @see QJournalProtocol#getEditLogManifest(String, long)
+ */
+ public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+ throws IOException {
+ // TODO: check fencing info?
+ RemoteEditLogManifest manifest = new RemoteEditLogManifest(
+ fjm.getRemoteEditLogs(sinceTxId));
+ return manifest;
+ }
+
+ /**
+ * @return the current state of the given segment, or null if the
+ * segment does not exist.
+ */
+ private SegmentStateProto getSegmentInfo(long segmentTxId)
+ throws IOException {
+ EditLogFile elf = fjm.getLogFile(segmentTxId);
+ if (elf == null) {
+ return null;
+ }
+ if (elf.isInProgress()) {
+ elf.validateLog();
+ }
+ if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) {
+ // no transactions in file
+ throw new AssertionError("TODO: no transactions in file " +
+ elf);
+ }
+ SegmentStateProto ret = SegmentStateProto.newBuilder()
+ .setStartTxId(segmentTxId)
+ .setEndTxId(elf.getLastTxId())
+ .setIsInProgress(elf.isInProgress())
+ .setMd5Sum(ByteString.EMPTY) // TODO
+ .build();
+ LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " +
+ TextFormat.shortDebugString(ret));
+ return ret;
+ }
+
+ /**
+ * @see QJournalProtocol#prepareRecovery(RequestInfo, long)
+ */
+ public synchronized PrepareRecoveryResponseProto prepareRecovery(
+ RequestInfo reqInfo, long segmentTxId) throws IOException {
+ checkRequest(reqInfo);
+
+ PrepareRecoveryResponseProto.Builder builder =
+ PrepareRecoveryResponseProto.newBuilder();
+
+ PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId);
+ if (previouslyAccepted != null) {
+ builder.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch())
+ .setSegmentState(previouslyAccepted.getSegmentState());
+ } else {
+ SegmentStateProto segInfo = getSegmentInfo(segmentTxId);
+ if (segInfo != null) {
+ builder.setSegmentState(segInfo);
+ }
+ }
+
+ PrepareRecoveryResponseProto resp = builder.build();
+ LOG.info("Prepared recovery for segment " + segmentTxId + ": " +
+ TextFormat.shortDebugString(resp));
+ return resp;
+ }
+
+ /**
+ * @see QJournalProtocol#acceptRecovery(RequestInfo, SegmentStateProto, URL)
+ */
+ public synchronized void acceptRecovery(RequestInfo reqInfo,
+ SegmentStateProto segment, URL fromUrl)
+ throws IOException {
+ checkRequest(reqInfo);
+ long segmentTxId = segment.getStartTxId();
+
+ // TODO: right now, a recovery of a segment when the log is
+ // completely emtpy (ie startLogSegment() but no txns)
+ // will fail this assertion here, since endTxId < startTxId
+ Preconditions.checkArgument(segment.getEndTxId() > 0 &&
+ segment.getEndTxId() >= segmentTxId,
+ "bad recovery state for segment %s: %s",
+ segmentTxId, TextFormat.shortDebugString(segment));
+
+ PersistedRecoveryPaxosData oldData = getPersistedPaxosData(segmentTxId);
+ PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData.newBuilder()
+ .setAcceptedInEpoch(reqInfo.getEpoch())
+ .setSegmentState(segment)
+ .build();
+ if (oldData != null) {
+ Preconditions.checkState(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(),
+ "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n",
+ oldData, newData);
+ }
+
+ SegmentStateProto currentSegment = getSegmentInfo(segmentTxId);
+ // TODO: this can be null, in the case that one of the loggers started
+ // the next segment, but others did not! add regression test and null
+ // check in next condition below.
+
+ // TODO: what if they have the same length but one is finalized and the
+ // other isn't! cover that case.
+ if (currentSegment.getEndTxId() != segment.getEndTxId()) {
+ syncLog(reqInfo, segment, fromUrl);
+ } else {
+ LOG.info("Skipping download of log " +
+ TextFormat.shortDebugString(segment) +
+ ": already have up-to-date logs");
+ }
+
+ // TODO: is it OK that this is non-atomic?
+ // we might be left with an older epoch recorded, but a newer log
+
+ persistPaxosData(segmentTxId, newData);
+ LOG.info("Accepted recovery for segment " + segmentTxId + ": " +
+ TextFormat.shortDebugString(newData));
+ }
+
+ /**
+ * Synchronize a log segment from another JournalNode.
+ * @param reqInfo the request info for the recovery IPC
+ * @param segment
+ * @param url
+ * @throws IOException
+ */
+ private void syncLog(RequestInfo reqInfo,
+ SegmentStateProto segment, URL url) throws IOException {
+ String tmpFileName =
+ "synclog_" + segment.getStartTxId() + "_" +
+ reqInfo.getEpoch() + "." + reqInfo.getIpcSerialNumber();
+
+ List<File> localPaths = storage.getFiles(null, tmpFileName);
+ assert localPaths.size() == 1;
+ File tmpFile = localPaths.get(0);
+
+ boolean success = false;
+
+ LOG.info("Synchronizing log " +
+ TextFormat.shortDebugString(segment) + " from " + url);
+ TransferFsImage.doGetUrl(url, localPaths, storage, true);
+ assert tmpFile.exists();
+ try {
+ success = tmpFile.renameTo(storage.getInProgressEditLog(
+ segment.getStartTxId()));
+ if (success) {
+ // If we're synchronizing the latest segment, update our cached
+ // info.
+ // TODO: can this be done more generally?
+ if (curSegmentTxId == segment.getStartTxId()) {
+ nextTxId = segment.getEndTxId() + 1;
+ }
+ }
+ } finally {
+ if (!success) {
+ if (!tmpFile.delete()) {
+ LOG.warn("Failed to delete temporary file " + tmpFile);
+ }
+ }
+ }
+ }
+
+ /**
+ * Retrieve the persisted data for recovering the given segment from disk.
+ */
+ private PersistedRecoveryPaxosData getPersistedPaxosData(long segmentTxId)
+ throws IOException {
+ File f = storage.getPaxosFile(segmentTxId);
+ if (!f.exists()) {
+ // Default instance has no fields filled in (they're optional)
+ return null;
+ }
+
+ InputStream in = new FileInputStream(f);
+ try {
+ PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in);
+ Preconditions.checkState(ret != null &&
+ ret.getSegmentState().getStartTxId() == segmentTxId,
+ "Bad persisted data for segment %s: %s",
+ segmentTxId, ret);
+ return ret;
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ }
+
+ /**
+ * Persist data for recovering the given segment from disk.
+ */
+ private void persistPaxosData(long segmentTxId,
+ PersistedRecoveryPaxosData newData) throws IOException {
+ File f = storage.getPaxosFile(segmentTxId);
+ boolean success = false;
+ AtomicFileOutputStream fos = new AtomicFileOutputStream(f);
+ try {
+ newData.writeDelimitedTo(fos);
+ fos.write('\n');
+ // Write human-readable data after the protobuf. This is only
+ // to assist in debugging -- it's not parsed at all.
+ OutputStreamWriter writer = new OutputStreamWriter(fos);
+
+ writer.write(String.valueOf(newData));
+ writer.write('\n');
+ writer.flush();
+
+ fos.flush();
+ success = true;
+ } finally {
+ if (success) {
+ IOUtils.closeStream(fos);
+ } else {
+ fos.abort();
+ }
+ }
+ }
+}
Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * The JournalNode is a daemon which allows namenodes using
+ * the QuorumJournalManager to log and retrieve edits stored
+ * remotely. It is a thin wrapper around a local edit log
+ * directory with the addition of facilities to participate
+ * in the quorum protocol.
+ */
+@InterfaceAudience.Private
+public class JournalNode implements Tool, Configurable {
+ public static final Log LOG = LogFactory.getLog(JournalNode.class);
+ private Configuration conf;
+ private JournalNodeRpcServer rpcServer;
+ private JournalNodeHttpServer httpServer;
+ private Map<String, Journal> journalsById = Maps.newHashMap();
+
+ static {
+ HdfsConfiguration.init();
+ }
+
+ /**
+ * When stopped, the daemon will exit with this code.
+ */
+ private int resultCode = 0;
+
+ synchronized Journal getOrCreateJournal(String jid) {
+ QuorumJournalManager.checkJournalId(jid);
+
+ Journal journal = journalsById.get(jid);
+ if (journal == null) {
+ File logDir = getLogDir(jid);
+ LOG.info("Initializing journal in directory " + logDir);
+ journal = new Journal(logDir, new ErrorReporter());
+ journalsById.put(jid, journal);
+ }
+
+ return journal;
+ }
+
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ start();
+ return join();
+ }
+
+ /**
+ * Start listening for edits via RPC.
+ */
+ public void start() throws IOException {
+ Preconditions.checkState(!isStarted(), "JN already running");
+
+ DefaultMetricsSystem.initialize("JournalNode");
+ JvmMetrics.create("JournalNode",
+ conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
+ DefaultMetricsSystem.instance());
+
+ httpServer = new JournalNodeHttpServer(conf, this);
+ httpServer.start();
+
+ rpcServer = new JournalNodeRpcServer(conf, this);
+ rpcServer.start();
+ }
+
+ public boolean isStarted() {
+ return rpcServer != null;
+ }
+
+ /**
+ * @return the address the IPC server is bound to
+ */
+ public InetSocketAddress getBoundIpcAddress() {
+ return rpcServer.getAddress();
+ }
+
+
+ public InetSocketAddress getBoundHttpAddress() {
+ return httpServer.getAddress();
+ }
+
+
+ /**
+ * Stop the daemon with the given status code
+ * @param rc the status code with which to exit (non-zero
+ * should indicate an error)
+ */
+ public void stop(int rc) {
+ this.resultCode = rc;
+
+ if (rpcServer != null) {
+ rpcServer.stop();
+ }
+
+ if (httpServer != null) {
+ try {
+ httpServer.stop();
+ } catch (IOException ioe) {
+ LOG.warn("Unable to stop HTTP server for " + this, ioe);
+ }
+ }
+
+ for (Journal j : journalsById.values()) {
+ IOUtils.cleanup(LOG, j);
+ }
+ }
+
+ /**
+ * Wait for the daemon to exit.
+ * @return the result code (non-zero if error)
+ */
+ int join() throws InterruptedException {
+ if (rpcServer != null) {
+ rpcServer.join();
+ }
+ return resultCode;
+ }
+
+ public void stopAndJoin(int rc) throws InterruptedException {
+ stop(rc);
+ join();
+ }
+
+ /**
+ * Return the directory inside our configured storage
+ * dir which corresponds to a given journal.
+ * @param jid the journal identifier
+ * @return the file, which may or may not exist yet
+ */
+ private File getLogDir(String jid) {
+ String dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
+ DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT);
+ Preconditions.checkArgument(jid != null &&
+ !jid.isEmpty(),
+ "bad journal identifier: %s", jid);
+ return new File(new File(dir), jid);
+ }
+
+
+ private class ErrorReporter implements StorageErrorReporter {
+ @Override
+ public void reportErrorOnFile(File f) {
+ LOG.fatal("Error reported on file " + f + "... exiting",
+ new Exception());
+ stop(1);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new JournalNode(), args));
+ }
+}
Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import javax.servlet.ServletContext;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+/**
+ * Encapsulates the HTTP server started by the Journal Service.
+ */
+@InterfaceAudience.Private
+public class JournalNodeHttpServer {
+ public static final Log LOG = LogFactory.getLog(
+ JournalNodeHttpServer.class);
+
+ public static final String JN_ATTRIBUTE_KEY = "localjournal";
+
+ private HttpServer httpServer;
+ private int infoPort;
+ private JournalNode localJournalNode;
+
+ private final Configuration conf;
+
+ JournalNodeHttpServer(Configuration conf, JournalNode jn) {
+ this.conf = conf;
+ this.localJournalNode = jn;
+ }
+
+ void start() throws IOException {
+ final InetSocketAddress bindAddr = getAddress(conf);
+
+ // initialize the webserver for uploading/downloading files.
+ LOG.info("Starting web server as: "
+ + UserGroupInformation.getCurrentUser().getUserName());
+
+ int tmpInfoPort = bindAddr.getPort();
+ httpServer = new HttpServer("journal", bindAddr.getHostName(),
+ tmpInfoPort, tmpInfoPort == 0, conf, new AccessControlList(conf
+ .get(DFS_ADMIN, " "))) {
+ {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ initSpnego(conf, DFS_JOURNALNODE_USER_NAME_KEY,
+ DFS_JOURNALNODE_KEYTAB_FILE_KEY);
+ }
+ }
+ };
+ httpServer.setAttribute(JN_ATTRIBUTE_KEY, localJournalNode);
+ httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
+ httpServer.addInternalServlet("getJournal", "/getJournal",
+ GetJournalEditServlet.class, true);
+ httpServer.start();
+
+ // The web-server port can be ephemeral... ensure we have the correct info
+ infoPort = httpServer.getPort();
+
+ LOG.info("Journal Web-server up at: " + bindAddr + ":" + infoPort);
+ }
+
+ void stop() throws IOException {
+ if (httpServer != null) {
+ try {
+ httpServer.stop();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /**
+ * Return the actual address bound to by the running server.
+ */
+ public InetSocketAddress getAddress() {
+ InetSocketAddress addr = httpServer.getListenerAddress();
+ assert addr.getPort() != 0;
+ return addr;
+ }
+
+ private static InetSocketAddress getAddress(Configuration conf) {
+ String addr = conf.get(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY,
+ DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT);
+ return NetUtils.createSocketAddr(addr,
+ DFSConfigKeys.DFS_JOURNALNODE_HTTP_PORT_DEFAULT,
+ DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY);
+ }
+
+ public static Journal getJournalFromContext(ServletContext context, String jid) {
+ JournalNode jn = (JournalNode)context.getAttribute(JN_ATTRIBUTE_KEY);
+ return jn.getOrCreateJournal(jid);
+ }
+
+ public static Configuration getConfFromContext(ServletContext context) {
+ return (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+ }
+}
Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.QJournalProtocolService;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.net.NetUtils;
+
+import com.google.protobuf.BlockingService;
+
+class JournalNodeRpcServer implements QJournalProtocol {
+
+ private static final int HANDLER_COUNT = 5;
+ private JournalNode jn;
+ private Server server;
+
+ JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
+ this.jn = jn;
+
+ InetSocketAddress addr = getAddress(conf);
+ RPC.setProtocolEngine(conf, QJournalProtocolPB.class,
+ ProtobufRpcEngine.class);
+ QJournalProtocolServerSideTranslatorPB translator =
+ new QJournalProtocolServerSideTranslatorPB(this);
+ BlockingService service = QJournalProtocolService
+ .newReflectiveBlockingService(translator);
+
+ this.server = RPC.getServer(
+ QJournalProtocolPB.class,
+ service, addr.getHostName(),
+ addr.getPort(), HANDLER_COUNT, false, conf,
+ null /*secretManager*/);
+ }
+
+ void start() {
+ this.server.start();
+ }
+
+ public InetSocketAddress getAddress() {
+ return server.getListenerAddress();
+ }
+
+ void join() throws InterruptedException {
+ this.server.join();
+ }
+
+ void stop() {
+ this.server.stop();
+ }
+
+ private static InetSocketAddress getAddress(Configuration conf) {
+ String addr = conf.get(
+ DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
+ DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT);
+ return NetUtils.createSocketAddr(addr, 0,
+ DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY);
+ }
+
+ @Override
+ public GetJournalStateResponseProto getJournalState(String journalId)
+ throws IOException {
+ long epoch = jn.getOrCreateJournal(journalId).getLastPromisedEpoch();
+ return GetJournalStateResponseProto.newBuilder()
+ .setLastPromisedEpoch(epoch)
+ .setHttpPort(jn.getBoundHttpAddress().getPort())
+ .build();
+ }
+
+ @Override
+ public NewEpochResponseProto newEpoch(String journalId,
+ NamespaceInfo nsInfo,
+ long epoch) throws IOException {
+ return jn.getOrCreateJournal(journalId).newEpoch(nsInfo, epoch);
+ }
+
+
+ @Override
+ public void journal(RequestInfo reqInfo, long firstTxnId,
+ int numTxns, byte[] records) throws IOException {
+ jn.getOrCreateJournal(reqInfo.getJournalId())
+ .journal(reqInfo, firstTxnId, numTxns, records);
+ }
+
+ @Override
+ public void startLogSegment(RequestInfo reqInfo, long txid)
+ throws IOException {
+ jn.getOrCreateJournal(reqInfo.getJournalId())
+ .startLogSegment(reqInfo, txid);
+ }
+
+ @Override
+ public void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
+ long endTxId) throws IOException {
+ jn.getOrCreateJournal(reqInfo.getJournalId())
+ .finalizeLogSegment(reqInfo, startTxId, endTxId);
+ }
+
+ @Override
+ public GetEditLogManifestResponseProto getEditLogManifest(String jid,
+ long sinceTxId) throws IOException {
+
+ RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
+ .getEditLogManifest(sinceTxId);
+
+ return GetEditLogManifestResponseProto.newBuilder()
+ .setManifest(PBHelper.convert(manifest))
+ .setHttpPort(jn.getBoundHttpAddress().getPort())
+ .build();
+ }
+
+ @Override
+ public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
+ long segmentTxId) throws IOException {
+ return jn.getOrCreateJournal(reqInfo.getJournalId())
+ .prepareRecovery(reqInfo, segmentTxId);
+ }
+
+ @Override
+ public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto log,
+ URL fromUrl) throws IOException {
+ jn.getOrCreateJournal(reqInfo.getJournalId())
+ .acceptRecovery(reqInfo, log, fromUrl);
+ }
+
+}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Fri Jul 20 00:25:50 2012
@@ -39,7 +39,8 @@ public final class HdfsServerConstants {
*/
static public enum NodeType {
NAME_NODE,
- DATA_NODE;
+ DATA_NODE,
+ JOURNAL_NODE;
}
/** Startup options */
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Fri Jul 20 00:25:50 2012
@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.util.VersionInfo;
+import com.google.common.base.Preconditions;
+
/**
@@ -75,7 +77,7 @@ public abstract class Storage extends St
/** Layout versions of 0.20.203 release */
public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
- private static final String STORAGE_FILE_LOCK = "in_use.lock";
+ public static final String STORAGE_FILE_LOCK = "in_use.lock";
protected static final String STORAGE_FILE_VERSION = "VERSION";
public static final String STORAGE_DIR_CURRENT = "current";
public static final String STORAGE_DIR_PREVIOUS = "previous";
@@ -719,6 +721,15 @@ public abstract class Storage extends St
return storageDirs.get(idx);
}
+ /**
+ * @return the storage directory, with the precondition that this storage
+ * has exactly one storage directory
+ */
+ public StorageDirectory getSingularStorageDir() {
+ Preconditions.checkState(storageDirs.size() == 1);
+ return storageDirs.get(0);
+ }
+
protected void addStorageDir(StorageDirectory sd) {
storageDirs.add(sd);
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java Fri Jul 20 00:25:50 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -34,7 +35,8 @@ import com.google.common.base.Preconditi
* to progress concurrently to flushes without allocating new buffers each
* time.
*/
-class EditsDoubleBuffer {
+@InterfaceAudience.Private
+public class EditsDoubleBuffer {
private TxnBuffer bufCurrent; // current buffer for writing
private TxnBuffer bufReady; // buffer ready for flushing
@@ -51,11 +53,11 @@ class EditsDoubleBuffer {
bufCurrent.writeOp(op);
}
- void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+ public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
bufCurrent.write(bytes, offset, length);
}
- void close() throws IOException {
+ public void close() throws IOException {
Preconditions.checkNotNull(bufCurrent);
Preconditions.checkNotNull(bufReady);
@@ -69,7 +71,7 @@ class EditsDoubleBuffer {
bufCurrent = bufReady = null;
}
- void setReadyToFlush() {
+ public void setReadyToFlush() {
assert isFlushed() : "previous data not flushed yet";
TxnBuffer tmp = bufReady;
bufReady = bufCurrent;
@@ -80,12 +82,12 @@ class EditsDoubleBuffer {
* Writes the content of the "ready" buffer to the given output stream,
* and resets it. Does not swap any buffers.
*/
- void flushTo(OutputStream out) throws IOException {
+ public void flushTo(OutputStream out) throws IOException {
bufReady.writeTo(out); // write data to file
bufReady.reset(); // erase all data in the buffer
}
- boolean shouldForceSync() {
+ public boolean shouldForceSync() {
return bufCurrent.size() >= initBufferSize;
}
@@ -120,6 +122,12 @@ class EditsDoubleBuffer {
return bufReady.numTxns;
}
+ /**
+ * @return the number of bytes that are ready to be flushed
+ */
+ public int countReadyBytes() {
+ return bufReady.size();
+ }
private static class TxnBuffer extends DataOutputBuffer {
long firstTxId;
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Jul 20 00:25:50 2012
@@ -1136,6 +1136,7 @@ public class FSEditLog {
journalSet.recoverUnfinalizedSegments();
} catch (IOException ex) {
// All journals have failed, it is handled in logSync.
+ // TODO: are we sure this is OK?
}
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Jul 20 00:25:50 2012
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.ComparisonChain;
@@ -50,7 +52,8 @@ import com.google.common.collect.Compari
* Note: this class is not thread-safe and should be externally
* synchronized.
*/
-class FileJournalManager implements JournalManager {
+@InterfaceAudience.Private
+public class FileJournalManager implements JournalManager {
private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
private final StorageDirectory sd;
@@ -147,7 +150,7 @@ class FileJournalManager implements Jour
* @return a list of remote edit logs
* @throws IOException if edit logs cannot be listed.
*/
- List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
+ public List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
@@ -165,6 +168,8 @@ class FileJournalManager implements Jour
}
}
+ Collections.sort(ret);
+
return ret;
}
@@ -178,7 +183,7 @@ class FileJournalManager implements Jour
* @throws IOException
* IOException thrown for invalid logDir
*/
- static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
+ public static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
return matchEditLogs(FileUtil.listFiles(logDir));
}
@@ -206,7 +211,7 @@ class FileJournalManager implements Jour
try {
long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
ret.add(
- new EditLogFile(f, startTxId, startTxId, true));
+ new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true));
} catch (NumberFormatException nfe) {
LOG.error("In-progress edits file " + f + " has improperly " +
"formatted transaction ID");
@@ -304,7 +309,7 @@ class FileJournalManager implements Jour
}
}
- List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
+ public List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
List<EditLogFile> logFiles = Lists.newArrayList();
@@ -320,6 +325,32 @@ class FileJournalManager implements Jour
return logFiles;
}
+
+ public EditLogFile getLogFile(long startTxId) throws IOException {
+ return getLogFile(sd.getCurrentDir(), startTxId);
+ }
+
+ public static EditLogFile getLogFile(File dir, long startTxId)
+ throws IOException {
+ List<EditLogFile> files = matchEditLogs(dir);
+ List<EditLogFile> ret = Lists.newLinkedList();
+ for (EditLogFile elf : files) {
+ if (elf.getFirstTxId() == startTxId) {
+ ret.add(elf);
+ }
+ }
+
+ if (ret.isEmpty()) {
+ // no matches
+ return null;
+ } else if (ret.size() == 1) {
+ return ret.get(0);
+ } else {
+ throw new IllegalStateException("More than one log segment in " +
+ dir + " starting at txid " + startTxId + ": " +
+ Joiner.on(", ").join(ret));
+ }
+ }
@Override
public String toString() {
@@ -329,7 +360,8 @@ class FileJournalManager implements Jour
/**
* Record of an edit log that has been located and had its filename parsed.
*/
- static class EditLogFile {
+ @InterfaceAudience.Private
+ public static class EditLogFile {
private File file;
private final long firstTxId;
private long lastTxId;
@@ -362,17 +394,20 @@ class FileJournalManager implements Jour
assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID);
assert file != null;
+ Preconditions.checkArgument(!isInProgress ||
+ lastTxId == HdfsConstants.INVALID_TXID);
+
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.file = file;
this.isInProgress = isInProgress;
}
- long getFirstTxId() {
+ public long getFirstTxId() {
return firstTxId;
}
- long getLastTxId() {
+ public long getLastTxId() {
return lastTxId;
}
@@ -385,17 +420,17 @@ class FileJournalManager implements Jour
* This will update the lastTxId of the EditLogFile or
* mark it as corrupt if it is.
*/
- void validateLog() throws IOException {
+ public void validateLog() throws IOException {
EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
this.lastTxId = val.getEndTxId();
this.hasCorruptHeader = val.hasCorruptHeader();
}
- boolean isInProgress() {
+ public boolean isInProgress() {
return isInProgress;
}
- File getFile() {
+ public File getFile() {
return file;
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java Fri Jul 20 00:25:50 2012
@@ -229,6 +229,13 @@ public class JournalSet implements Journ
}
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
}
+ chainAndMakeRedundantStreams(streams, allStreams, fromTxId, inProgressOk);
+ }
+
+ public static void chainAndMakeRedundantStreams(
+ Collection<EditLogInputStream> outStreams,
+ PriorityQueue<EditLogInputStream> allStreams,
+ long fromTxId, boolean inProgressOk) {
// We want to group together all the streams that start on the same start
// transaction ID. To do this, we maintain an accumulator (acc) of all
// the streams we've seen at a given start transaction ID. When we see a
@@ -246,7 +253,7 @@ public class JournalSet implements Journ
if (accFirstTxId == elis.getFirstTxId()) {
acc.add(elis);
} else if (accFirstTxId < elis.getFirstTxId()) {
- streams.add(new RedundantEditLogInputStream(acc, fromTxId));
+ outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
acc.add(elis);
} else if (accFirstTxId > elis.getFirstTxId()) {
@@ -257,7 +264,7 @@ public class JournalSet implements Journ
}
}
if (!acc.isEmpty()) {
- streams.add(new RedundantEditLogInputStream(acc, fromTxId));
+ outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
}
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java Fri Jul 20 00:25:50 2012
@@ -41,6 +41,14 @@ final class NameNodeResourcePolicy {
static boolean areResourcesAvailable(
Collection<? extends CheckableNameNodeResource> resources,
int minimumRedundantResources) {
+
+ // TODO: workaround:
+ // - during startup, if there are no edits dirs on disk, then there is
+ // a call to areResourcesAvailable() with no dirs at all, which was
+ // previously causing the NN to enter safemode
+ if (resources.isEmpty()) {
+ return true;
+ }
int requiredResourceCount = 0;
int redundantResourceCount = 0;