You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/07/20 02:25:52 UTC

svn commit: r1363596 [2/3] - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org/apache/hadoop/hdfs...

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+import java.io.IOException;
+import java.net.URL;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link JournalProtocolPB} to the 
+ * {@link JournalProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolPB {
+  /** Server side implementation to delegate the requests to */
+  private final QJournalProtocol impl;
+
+  public QJournalProtocolServerSideTranslatorPB(QJournalProtocol impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public GetJournalStateResponseProto getJournalState(RpcController controller,
+      GetJournalStateRequestProto request) throws ServiceException {
+    try {
+      return impl.getJournalState(
+          convert(request.getJid()));
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+
+  private String convert(JournalIdProto jid) {
+    return jid.getIdentifier();
+  }
+
+  @Override
+  public NewEpochResponseProto newEpoch(RpcController controller,
+      NewEpochRequestProto request) throws ServiceException {
+    try {
+      return impl.newEpoch(
+          request.getJid().getIdentifier(),
+          PBHelper.convert(request.getNsInfo()),
+          request.getEpoch());
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+
+  /** @see JournalProtocol#journal */
+  @Override
+  public JournalResponseProto journal(RpcController unused,
+      JournalRequestProto req) throws ServiceException {
+    try {
+      impl.journal(convert(req.getReqInfo()),
+          req.getFirstTxnId(), req.getNumTxns(), req.getRecords()
+              .toByteArray());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return JournalResponseProto.newBuilder().build();
+  }
+
+  /** @see JournalProtocol#startLogSegment */
+  @Override
+  public StartLogSegmentResponseProto startLogSegment(RpcController controller,
+      StartLogSegmentRequestProto req) throws ServiceException {
+    try {
+      impl.startLogSegment(convert(req.getReqInfo()),
+          req.getTxid());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return StartLogSegmentResponseProto.newBuilder().build();
+  }
+  
+  @Override
+  public FinalizeLogSegmentResponseProto finalizeLogSegment(
+      RpcController controller, FinalizeLogSegmentRequestProto req)
+      throws ServiceException {
+    try {
+      impl.finalizeLogSegment(convert(req.getReqInfo()),
+          req.getStartTxId(), req.getEndTxId());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return FinalizeLogSegmentResponseProto.newBuilder().build();
+  }
+
+  @Override
+  public GetEditLogManifestResponseProto getEditLogManifest(
+      RpcController controller, GetEditLogManifestRequestProto request)
+      throws ServiceException {
+    try {
+      return impl.getEditLogManifest(
+          request.getJid().getIdentifier(),
+          request.getSinceTxId());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+
+  @Override
+  public PrepareRecoveryResponseProto prepareRecovery(RpcController controller,
+      PrepareRecoveryRequestProto request) throws ServiceException {
+    try {
+      return impl.prepareRecovery(convert(request.getReqInfo()),
+          request.getSegmentTxId());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public AcceptRecoveryResponseProto acceptRecovery(RpcController controller,
+      AcceptRecoveryRequestProto request) throws ServiceException {
+    try {
+      impl.acceptRecovery(convert(request.getReqInfo()),
+          request.getStateToAccept(),
+          new URL(request.getFromURL()));
+      return AcceptRecoveryResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  
+  private RequestInfo convert(
+      QJournalProtocolProtos.RequestInfoProto reqInfo) {
+    return new RequestInfo(
+        reqInfo.getJournalId().getIdentifier(),
+        reqInfo.getEpoch(),
+        reqInfo.getIpcSerialNumber());
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link JournalProtocol} interfaces to the RPC server implementing
+ * {@link JournalProtocolPB}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
+    QJournalProtocol, Closeable {
+  /** RpcController is not used and hence is set to null */
+  private final static RpcController NULL_CONTROLLER = null;
+  private final QJournalProtocolPB rpcProxy;
+  
+  public QJournalProtocolTranslatorPB(QJournalProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
+  }
+
+  @Override
+  public void close() {
+    RPC.stopProxy(rpcProxy);
+  }
+
+
+  @Override
+  public GetJournalStateResponseProto getJournalState(String jid)
+      throws IOException {
+    try {
+      GetJournalStateRequestProto req = GetJournalStateRequestProto.newBuilder()
+          .setJid(convertJournalId(jid))
+          .build();
+      return rpcProxy.getJournalState(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  private JournalIdProto convertJournalId(String jid) {
+    return JournalIdProto.newBuilder()
+        .setIdentifier(jid)
+        .build();
+  }
+
+  @Override
+  public NewEpochResponseProto newEpoch(String jid, NamespaceInfo nsInfo,
+      long epoch) throws IOException {
+    try {
+      NewEpochRequestProto req = NewEpochRequestProto.newBuilder()
+        .setJid(convertJournalId(jid))
+        .setNsInfo(PBHelper.convert(nsInfo))
+        .setEpoch(epoch)
+        .build();
+      return rpcProxy.newEpoch(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void journal(RequestInfo reqInfo, long firstTxnId, int numTxns,
+      byte[] records) throws IOException {
+    JournalRequestProto req = JournalRequestProto.newBuilder()
+        .setReqInfo(convert(reqInfo))
+        .setFirstTxnId(firstTxnId)
+        .setNumTxns(numTxns)
+        .setRecords(PBHelper.getByteString(records))
+        .build();
+    try {
+      rpcProxy.journal(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  private QJournalProtocolProtos.RequestInfoProto convert(
+      RequestInfo reqInfo) {
+    return QJournalProtocolProtos.RequestInfoProto.newBuilder()
+      .setJournalId(convertJournalId(reqInfo.getJournalId()))
+      .setEpoch(reqInfo.getEpoch())
+      .setIpcSerialNumber(reqInfo.getIpcSerialNumber())
+      .build();
+  }
+
+  @Override
+  public void startLogSegment(RequestInfo reqInfo, long txid)
+      throws IOException {
+    StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
+        .setReqInfo(convert(reqInfo))
+        .setTxid(txid)
+        .build();
+    try {
+      rpcProxy.startLogSegment(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+  
+  @Override
+  public void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
+      long endTxId) throws IOException {
+    FinalizeLogSegmentRequestProto req =
+        FinalizeLogSegmentRequestProto.newBuilder()
+        .setReqInfo(convert(reqInfo))
+        .setStartTxId(startTxId)
+        .setEndTxId(endTxId)
+        .build();
+    try {
+      rpcProxy.finalizeLogSegment(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public GetEditLogManifestResponseProto getEditLogManifest(String jid,
+      long sinceTxId) throws IOException {
+    try {
+      return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
+          GetEditLogManifestRequestProto.newBuilder()
+            .setJid(convertJournalId(jid))
+            .setSinceTxId(sinceTxId)
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
+      long segmentTxId) throws IOException {
+    try {
+      return rpcProxy.prepareRecovery(NULL_CONTROLLER,
+          PrepareRecoveryRequestProto.newBuilder()
+            .setReqInfo(convert(reqInfo))
+            .setSegmentTxId(segmentTxId)
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void acceptRecovery(RequestInfo reqInfo,
+      SegmentStateProto stateToAccept, URL fromUrl) throws IOException {
+    try {
+      rpcProxy.acceptRecovery(NULL_CONTROLLER,
+          AcceptRecoveryRequestProto.newBuilder()
+            .setReqInfo(convert(reqInfo))
+            .setStateToAccept(stateToAccept)
+            .setFromURL(fromUrl.toExternalForm())
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  public boolean isMethodSupported(String methodName) throws IOException {
+    return RpcClientUtil.isMethodSupported(rpcProxy,
+        QJournalProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        RPC.getProtocolVersion(QJournalProtocolPB.class), methodName);
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ServletUtil;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This servlet is used in two cases:
+ * <ul>
+ * <li>The QuorumJournalManager, when reading edits, fetches the edit streams
+ * from the journal nodes.</li>
+ * <li>During edits synchronization, one journal node will fetch edits from
+ * another journal node.</li>
+ * </ul>
+ */
+@InterfaceAudience.Private
+public class GetJournalEditServlet extends HttpServlet {
+
+  private static final long serialVersionUID = -4635891628211723009L;
+  private static final Log LOG = LogFactory.getLog(GetJournalEditServlet.class);
+
+  static final String STORAGEINFO_PARAM = "storageInfo";
+  static final String JOURNAL_ID_PARAM = "jid";
+  static final String SEGMENT_TXID_PARAM = "segmentTxId";
+
+  // TODO: create security tests
+  protected boolean isValidRequestor(String remoteUser, Configuration conf)
+      throws IOException {
+    if (remoteUser == null) { // This really shouldn't happen...
+      LOG.warn("Received null remoteUser while authorizing access to " +
+          "GetJournalEditServlet");
+      return false;
+    }
+
+    String[] validRequestors = {
+        SecurityUtil.getServerPrincipal(conf
+            .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode
+            .getAddress(conf).getHostName()),
+        SecurityUtil.getServerPrincipal(conf
+            .get(DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY),
+            NameNode.getAddress(conf).getHostName()) };
+    // TODO: above principal is not correct, since each JN will have a
+    // different hostname.
+
+    for (String v : validRequestors) {
+      if (v != null && v.equals(remoteUser)) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("isValidRequestor is allowing: " + remoteUser);
+        return true;
+      }
+    }
+    if (LOG.isDebugEnabled())
+      LOG.debug("isValidRequestor is rejecting: " + remoteUser);
+    return false;
+  }
+  
+  private boolean checkRequestorOrSendError(Configuration conf,
+      HttpServletRequest request, HttpServletResponse response)
+          throws IOException {
+    if (UserGroupInformation.isSecurityEnabled()
+        && !isValidRequestor(request.getRemoteUser(), conf)) {
+      response.sendError(HttpServletResponse.SC_FORBIDDEN,
+          "Only Namenode and another JournalNode may access this servlet");
+      LOG.warn("Received non-NN/JN request for edits from "
+          + request.getRemoteHost());
+      return false;
+    }
+    return true;
+  }
+  
+  private boolean checkStorageInfoOrSendError(JNStorage storage,
+      HttpServletRequest request, HttpServletResponse response)
+      throws IOException {
+    String myStorageInfoString = storage.toColonSeparatedString();
+    String theirStorageInfoString = request.getParameter(STORAGEINFO_PARAM);
+    
+    if (theirStorageInfoString != null
+        && !myStorageInfoString.equals(theirStorageInfoString)) {
+      response.sendError(HttpServletResponse.SC_FORBIDDEN,
+              "This node has storage info " + myStorageInfoString
+                  + " but the requesting node expected "
+                  + theirStorageInfoString);
+      LOG.warn("Received an invalid request file transfer request "
+          + " with storage info " + theirStorageInfoString);
+      return false;
+    }
+    return true;
+  }
+  
+  @Override
+  public void doGet(final HttpServletRequest request,
+      final HttpServletResponse response) throws ServletException, IOException {
+    try {
+      final ServletContext context = getServletContext();
+      final Configuration conf = (Configuration) getServletContext()
+          .getAttribute(JspHelper.CURRENT_CONF);
+      final String journalId = request.getParameter(JOURNAL_ID_PARAM);
+      QuorumJournalManager.checkJournalId(journalId);
+      final JNStorage storage = JournalNodeHttpServer
+          .getJournalFromContext(context, journalId).getStorage();
+
+      // Check security
+      if (!checkRequestorOrSendError(conf, request, response)) {
+        return;
+      }
+
+      // Check that the namespace info is correct
+      if (!checkStorageInfoOrSendError(storage, request, response)) {
+        return;
+      }
+      
+      long segmentTxId = ServletUtil.parseLongParam(request,
+          SEGMENT_TXID_PARAM);
+
+      FileJournalManager fjm = storage.getJournalManager();
+      File editFile;
+      FileInputStream editFileIn;
+      
+      synchronized (fjm) {
+        // Synchronize on the FJM so that the file doesn't get finalized
+        // out from underneath us while we're in the process of opening
+        // it up.
+        EditLogFile elf = fjm.getLogFile(
+            segmentTxId);
+        if (elf == null) {
+          response.sendError(HttpServletResponse.SC_NOT_FOUND,
+              "No edit log found starting at txid " + segmentTxId);
+          return;
+        }
+        editFile = elf.getFile();
+        GetImageServlet.setVerificationHeaders(response, editFile);
+        GetImageServlet.setFileNameHeaders(response, editFile);
+        editFileIn = new FileInputStream(editFile);
+      }
+      
+      DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);
+
+      // send edits
+      TransferFsImage.getFileServer(response, editFile, editFileIn, throttler);
+
+    } catch (Throwable t) {
+      String errMsg = "getedit failed. " + StringUtils.stringifyException(t);
+      response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, errMsg);
+      throw new IOException(errMsg);
+    }
+  }
+
+  public static String buildPath(String journalId, long segmentTxId,
+      NamespaceInfo nsInfo) {
+    StringBuilder path = new StringBuilder("/getJournal?");
+    try {
+      path.append(JOURNAL_ID_PARAM).append("=")
+          .append(URLEncoder.encode(journalId, "UTF-8"));
+      path.append("&" + SEGMENT_TXID_PARAM).append("=")
+          .append(segmentTxId);
+      path.append("&" + STORAGEINFO_PARAM).append("=")
+          .append(URLEncoder.encode(nsInfo.toColonSeparatedString(), "UTF-8"));
+    } catch (UnsupportedEncodingException e) {
+      // Never get here -- everyone supports UTF-8
+      throw new RuntimeException(e);
+    }
+    return path.toString();
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+/**
+ * A {@link Storage} implementation for the {@link JournalNode}.
+ * 
+ * The JN has a storage directory for each namespace for which it stores
+ * metadata. There is only a single directory per JN in the current design.
+ */
+class JNStorage extends Storage {
+
+  private final FileJournalManager fjm;
+  private final StorageDirectory sd;
+  private boolean lazyInitted = false;
+
+  /**
+   * @param logDir the path to the directory in which data will be stored
+   * @param errorReporter a callback to report errors
+   */
+  protected JNStorage(File logDir, StorageErrorReporter errorReporter) {
+    super(NodeType.JOURNAL_NODE);
+    
+    sd = new StorageDirectory(logDir);
+    this.addStorageDir(sd);
+    this.fjm = new FileJournalManager(sd, errorReporter);
+  }
+  
+  FileJournalManager getJournalManager() {
+    return fjm;
+  }
+
+  @Override
+  public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
+    return false;
+  }
+
+  /**
+   * Find an edits file spanning the given transaction ID range.
+   * If no such file exists, an exception is thrown.
+   */
+  File findFinalizedEditsFile(long startTxId, long endTxId) throws IOException {
+    File ret = new File(sd.getCurrentDir(),
+        NNStorage.getFinalizedEditsFileName(startTxId, endTxId));
+    if (!ret.exists()) {
+      throw new IOException(
+          "No edits file for range " + startTxId + "-" + endTxId);
+    }
+    return ret;
+  }
+
+  /**
+   * @return the path for an in-progress edits file starting at the given
+   * transaction ID. This does not verify existence of the file. 
+   */
+  File getInProgressEditLog(long startTxId) {
+    return new File(sd.getCurrentDir(),
+        NNStorage.getInProgressEditsFileName(startTxId));
+  }
+
+  /**
+   * @return the path for the file which contains persisted data for the
+   * paxos-like recovery process for the given log segment.
+   */
+  File getPaxosFile(long segmentTxId) {
+    return new File(getPaxosDir(), String.valueOf(segmentTxId));
+  }
+  
+  private File getPaxosDir() {
+    return new File(sd.getCurrentDir(), "paxos");
+  }
+
+  void format(NamespaceInfo nsInfo) throws IOException {
+    setStorageInfo(nsInfo);
+    LOG.info("Formatting journal storage directory " + 
+        sd + " with nsid: " + getNamespaceID());
+    sd.clearDirectory();
+    writeProperties(sd);
+    if (!getPaxosDir().mkdirs()) {
+      throw new IOException("Could not create paxos dir: " + getPaxosDir());
+    }
+  }
+  
+  void analyzeStorage(NamespaceInfo nsInfo) throws IOException {
+    if (lazyInitted) {
+      checkConsistentNamespace(nsInfo);
+      return;
+    }
+    
+    StorageState state = sd.analyzeStorage(StartupOption.REGULAR, this);
+    switch (state) {
+    case NON_EXISTENT:
+    case NOT_FORMATTED:
+      format(nsInfo);
+      // In the NORMAL case below, analyzeStorage() has already locked the
+      // directory for us. But in the case that we format it, we have to
+      // lock it here.
+      // The directory is unlocked in close() when the node shuts down.
+      sd.lock();
+      break;
+    case NORMAL:
+      // Storage directory is already locked by analyzeStorage() - no
+      // need to lock it here.
+      readProperties(sd);
+      checkConsistentNamespace(nsInfo);
+      break;
+      
+    default:
+      LOG.warn("TODO: unhandled state for storage dir " + sd + ": " + state);
+    }
+    lazyInitted  = true;
+  }
+
+  private void checkConsistentNamespace(NamespaceInfo nsInfo)
+      throws IOException {
+    if (nsInfo.getNamespaceID() != getNamespaceID()) {
+      throw new IOException("Incompatible namespaceID for journal " +
+          this.sd + ": NameNode has nsId " + nsInfo.getNamespaceID() +
+          " but storage has nsId " + getNamespaceID());
+    }
+    
+    if (!nsInfo.getClusterID().equals(getClusterID())) {
+      throw new IOException("Incompatible clusterID for journal " +
+          this.sd + ": NameNode has clusterId '" + nsInfo.getClusterID() +
+          "' but storage has clusterId '" + getClusterID() + "'");
+      
+    }
+  }
+
+  public void close() throws IOException {
+    LOG.info("Closing journal storage for " + sd);
+    unlockAll();
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.net.URL;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
+import org.apache.hadoop.hdfs.util.PersistentLongFile;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.TextFormat;
+
+/**
+ * A JournalNode can manage journals for several clusters at once.
+ * Each such journal is entirely independent despite being hosted by
+ * the same JVM.
+ */
+class Journal implements Closeable {
+  static final Log LOG = LogFactory.getLog(Journal.class);
+
+
+  // Current writing state
+  private EditLogOutputStream curSegment;
+  private long curSegmentTxId = HdfsConstants.INVALID_TXID;
+  private long nextTxId = HdfsConstants.INVALID_TXID;
+  
+  private final JNStorage storage;
+
+  /**
+   * When a new writer comes along, it asks each node to promise
+   * to ignore requests from any previous writer, as identified
+   * by epoch number. In order to make such a promise, the epoch
+   * number of that writer is stored persistently on disk.
+   */
+  private PersistentLongFile lastPromisedEpoch;
+  private static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
+
+  private final FileJournalManager fjm;
+
+  Journal(File logDir, StorageErrorReporter errorReporter) {
+    storage = new JNStorage(logDir, errorReporter);
+
+    File currentDir = storage.getSingularStorageDir().getCurrentDir();
+    this.lastPromisedEpoch = new PersistentLongFile(
+        new File(currentDir, LAST_PROMISED_FILENAME), 0);
+
+    this.fjm = storage.getJournalManager();
+  }
+  
+  /**
+   * Iterate over the edit logs stored locally, and set
+   * {@link #curSegmentTxId} to refer to the most recently written
+   * one.
+   */
+  private synchronized void scanStorage() throws IOException {
+    if (!fjm.getStorageDirectory().getCurrentDir().exists()) {
+      return;
+    }
+    LOG.info("Scanning storage " + fjm);
+    List<EditLogFile> files = fjm.getLogFiles(0);
+    if (!files.isEmpty()) {
+      EditLogFile latestLog = files.get(files.size() - 1);
+      LOG.info("Latest log is " + latestLog);
+      curSegmentTxId = latestLog.getFirstTxId();
+    }
+  }
+
+  /**
+   * Format the local storage with the given namespace.
+   */
+  void format(NamespaceInfo nsInfo) throws IOException {
+    Preconditions.checkState(nsInfo.getNamespaceID() != 0,
+        "can't format with uninitialized namespace info: %s",
+        nsInfo);
+    storage.format(nsInfo);
+  }
+
+  /**
+   * Unlock and release resources.
+   */
+  @Override // Closeable
+  public void close() throws IOException {
+    storage.close();
+  }
+  
+  JNStorage getStorage() {
+    return storage;
+  }
+
+  /**
+   * @return the last epoch which this node has promised not to accept
+   * any lower epoch, or 0 if no promises have been made.
+   */
+  synchronized long getLastPromisedEpoch() throws IOException {
+    return lastPromisedEpoch.get();
+  }
+
+  /**
+   * Try to create a new epoch for this journal.
+   * @param nsInfo the namespace, which is verified for consistency or used to
+   * format, if the Journal has not yet been written to.
+   * @param epoch the epoch to start
+   * @return the status information necessary to begin recovery
+   * @throws IOException if the node has already made a promise to another
+   * writer with a higher epoch number, if the namespace is inconsistent,
+   * or if a disk error occurs.
+   */
+  synchronized NewEpochResponseProto newEpoch(
+      NamespaceInfo nsInfo, long epoch) throws IOException {
+
+    // If the storage is unformatted, format it with this NS.
+    // Otherwise, check that the NN's nsinfo matches the storage.
+    storage.analyzeStorage(nsInfo);
+    
+    if (epoch <= getLastPromisedEpoch()) {
+      throw new IOException("Proposed epoch " + epoch + " <= last promise " +
+          getLastPromisedEpoch());
+    }
+    
+    lastPromisedEpoch.set(epoch);
+    if (curSegment != null) {
+      curSegment.close();
+      curSegment = null;
+    }
+    
+    NewEpochResponseProto.Builder builder =
+        NewEpochResponseProto.newBuilder();
+
+    // TODO: we only need to do this once, not on writer switchover.
+    scanStorage();
+
+    if (curSegmentTxId != HdfsConstants.INVALID_TXID) {
+      builder.setLastSegmentTxId(curSegmentTxId);
+    }
+    
+    return builder.build();
+  }
+
+  /**
+   * Write a batch of edits to the journal.
+   * {@see QJournalProtocol#journal(RequestInfo, long, int, byte[])}
+   */
+  synchronized void journal(RequestInfo reqInfo, long firstTxnId,
+      int numTxns, byte[] records) throws IOException {
+    checkRequest(reqInfo);
+    
+    // TODO: if a JN goes down and comes back up, then it will throw
+    // this exception on every edit. We should instead send back
+    // a response indicating the log needs to be rolled, which would
+    // mark the logger on the client side as "pending" -- and have the
+    // NN code look for this condition and trigger a roll when it happens.
+    // That way the node can catch back up and rejoin
+    Preconditions.checkState(curSegment != null,
+        "Can't write, no segment open");
+    Preconditions.checkState(nextTxId == firstTxnId,
+        "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
+    
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Writing txid " + firstTxnId + "-" + (firstTxnId + numTxns - 1));
+    }
+    curSegment.writeRaw(records, 0, records.length);
+    curSegment.setReadyToFlush();
+    curSegment.flush();
+    nextTxId += numTxns;
+  }
+
+  /**
+   * Ensure that the given request is coming from the correct writer and in-order.
+   * @param reqInfo the request info
+   * @throws IOException if the request is invalid.
+   */
+  private synchronized void checkRequest(RequestInfo reqInfo) throws IOException {
+    // Invariant 25 from ZAB paper
+    if (reqInfo.getEpoch() < lastPromisedEpoch.get()) {
+      throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
+          " is less than the last promised epoch " +
+          lastPromisedEpoch.get());
+    }
+    
+    // TODO: should other requests check the _exact_ epoch instead of
+    // the <= check? <= should probably only be necessary for the
+    // first calls
+    
+    // TODO: some check on serial number that they only increase from a given
+    // client
+  }
+
+  /**
+   * Start a new segment at the given txid. The previous segment
+   * must have already been finalized.
+   */
+  public synchronized void startLogSegment(RequestInfo reqInfo, long txid)
+      throws IOException {
+    assert fjm != null;
+    checkRequest(reqInfo);
+    
+    Preconditions.checkState(curSegment == null,
+        "Can't start a log segment, already writing " + curSegment);
+    Preconditions.checkState(nextTxId == txid || nextTxId == HdfsConstants.INVALID_TXID,
+        "Can't start log segment " + txid + " expecting nextTxId=" + nextTxId);
+    curSegment = fjm.startLogSegment(txid);
+    curSegmentTxId = txid;
+    nextTxId = txid;
+  }
+  
+  /**
+   * Finalize the log segment at the given transaction ID.
+   */
+  public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
+      long endTxId) throws IOException {
+    checkRequest(reqInfo);
+
+    if (startTxId == curSegmentTxId) {
+      if (curSegment != null) {
+        curSegment.close();
+        curSegment = null;
+      }
+    }
+    
+    FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId);
+    if (elf.isInProgress()) {
+      // TODO: this is slow to validate when in non-recovery cases
+      // we already know the length here!
+
+      LOG.info("Validating log about to be finalized: " + elf);
+      elf.validateLog();
+      
+      Preconditions.checkState(elf.getLastTxId() == endTxId,
+          "Trying to finalize log %s-%s, but current state of log" +
+          "is %s", startTxId, endTxId, elf);
+      fjm.finalizeLogSegment(startTxId, endTxId);
+    } else {
+      Preconditions.checkArgument(endTxId == elf.getLastTxId(),
+          "Trying to re-finalize already finalized log " +
+              elf + " with different endTxId " + endTxId);
+    }
+  }
+  
+  /**
+   * @see QJournalProtocol#getEditLogManifest(String, long)
+   */
+  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+      throws IOException {
+    // TODO: check fencing info?
+    RemoteEditLogManifest manifest = new RemoteEditLogManifest(
+        fjm.getRemoteEditLogs(sinceTxId));
+    return manifest;
+  }
+
+  /**
+   * @return the current state of the given segment, or null if the
+   * segment does not exist.
+   */
+  private SegmentStateProto getSegmentInfo(long segmentTxId)
+      throws IOException {
+    EditLogFile elf = fjm.getLogFile(segmentTxId);
+    if (elf == null) {
+      return null;
+    }
+    if (elf.isInProgress()) {
+      elf.validateLog();
+    }
+    if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) {
+      // no transactions in file
+      throw new AssertionError("TODO: no transactions in file " +
+          elf);
+    }
+    SegmentStateProto ret = SegmentStateProto.newBuilder()
+        .setStartTxId(segmentTxId)
+        .setEndTxId(elf.getLastTxId())
+        .setIsInProgress(elf.isInProgress())
+        .setMd5Sum(ByteString.EMPTY) // TODO
+        .build();
+    LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " +
+        TextFormat.shortDebugString(ret));
+    return ret;
+  }
+
+  /**
+   * @see QJournalProtocol#prepareRecovery(RequestInfo, long)
+   */
+  public synchronized PrepareRecoveryResponseProto prepareRecovery(
+      RequestInfo reqInfo, long segmentTxId) throws IOException {
+    checkRequest(reqInfo);
+    
+    PrepareRecoveryResponseProto.Builder builder =
+        PrepareRecoveryResponseProto.newBuilder();
+    
+    PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId);
+    if (previouslyAccepted != null) {
+      builder.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch())
+        .setSegmentState(previouslyAccepted.getSegmentState());
+    } else {
+      SegmentStateProto segInfo = getSegmentInfo(segmentTxId);
+      if (segInfo != null) {
+        builder.setSegmentState(segInfo);
+      }
+    }
+    
+    PrepareRecoveryResponseProto resp = builder.build();
+    LOG.info("Prepared recovery for segment " + segmentTxId + ": " +
+        TextFormat.shortDebugString(resp));
+    return resp;
+  }
+
+  /**
+   * @see QJournalProtocol#acceptRecovery(RequestInfo, SegmentStateProto, URL)
+   */
+  public synchronized void acceptRecovery(RequestInfo reqInfo,
+      SegmentStateProto segment, URL fromUrl)
+      throws IOException {
+    checkRequest(reqInfo);
+    long segmentTxId = segment.getStartTxId();
+
+    // TODO: right now, a recovery of a segment when the log is
+    // completely emtpy (ie startLogSegment() but no txns)
+    // will fail this assertion here, since endTxId < startTxId
+    Preconditions.checkArgument(segment.getEndTxId() > 0 &&
+        segment.getEndTxId() >= segmentTxId,
+        "bad recovery state for segment %s: %s",
+        segmentTxId, TextFormat.shortDebugString(segment));
+    
+    PersistedRecoveryPaxosData oldData = getPersistedPaxosData(segmentTxId);
+    PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData.newBuilder()
+        .setAcceptedInEpoch(reqInfo.getEpoch())
+        .setSegmentState(segment)
+        .build();
+    if (oldData != null) {
+      Preconditions.checkState(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(),
+          "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n",
+          oldData, newData);
+    }
+
+    SegmentStateProto currentSegment = getSegmentInfo(segmentTxId);
+    // TODO: this can be null, in the case that one of the loggers started
+    // the next segment, but others did not! add regression test and null
+    // check in next condition below.
+    
+    // TODO: what if they have the same length but one is finalized and the
+    // other isn't! cover that case.
+    if (currentSegment.getEndTxId() != segment.getEndTxId()) {
+      syncLog(reqInfo, segment, fromUrl);
+    } else {
+      LOG.info("Skipping download of log " +
+          TextFormat.shortDebugString(segment) +
+          ": already have up-to-date logs");
+    }
+    
+    // TODO: is it OK that this is non-atomic?
+    // we might be left with an older epoch recorded, but a newer log
+    
+    persistPaxosData(segmentTxId, newData);
+    LOG.info("Accepted recovery for segment " + segmentTxId + ": " +
+        TextFormat.shortDebugString(newData));
+  }
+
+  /**
+   * Synchronize a log segment from another JournalNode.
+   * @param reqInfo the request info for the recovery IPC
+   * @param segment 
+   * @param url
+   * @throws IOException
+   */
+  private void syncLog(RequestInfo reqInfo,
+      SegmentStateProto segment, URL url) throws IOException {
+    String tmpFileName =
+        "synclog_" + segment.getStartTxId() + "_" +
+        reqInfo.getEpoch() + "." + reqInfo.getIpcSerialNumber();
+    
+    List<File> localPaths = storage.getFiles(null, tmpFileName);
+    assert localPaths.size() == 1;
+    File tmpFile = localPaths.get(0);
+ 
+    boolean success = false;
+
+    LOG.info("Synchronizing log " +
+        TextFormat.shortDebugString(segment) + " from " + url);
+    TransferFsImage.doGetUrl(url, localPaths, storage, true);
+    assert tmpFile.exists();
+    try {
+      success = tmpFile.renameTo(storage.getInProgressEditLog(
+          segment.getStartTxId()));
+      if (success) {
+        // If we're synchronizing the latest segment, update our cached
+        // info.
+        // TODO: can this be done more generally?
+        if (curSegmentTxId == segment.getStartTxId()) {
+          nextTxId = segment.getEndTxId() + 1;
+        }
+      }
+    } finally {
+      if (!success) {
+        if (!tmpFile.delete()) {
+          LOG.warn("Failed to delete temporary file " + tmpFile);
+        }
+      }
+    }
+  }
+
+  /**
+   * Retrieve the persisted data for recovering the given segment from disk.
+   */
+  private PersistedRecoveryPaxosData getPersistedPaxosData(long segmentTxId)
+      throws IOException {
+    File f = storage.getPaxosFile(segmentTxId);
+    if (!f.exists()) {
+      // Default instance has no fields filled in (they're optional)
+      return null;
+    }
+    
+    InputStream in = new FileInputStream(f);
+    try {
+      PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in);
+      Preconditions.checkState(ret != null &&
+          ret.getSegmentState().getStartTxId() == segmentTxId,
+          "Bad persisted data for segment %s: %s",
+          segmentTxId, ret);
+      return ret;
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+
+  /**
+   * Persist data for recovering the given segment from disk.
+   */
+  private void persistPaxosData(long segmentTxId,
+      PersistedRecoveryPaxosData newData) throws IOException {
+    File f = storage.getPaxosFile(segmentTxId);
+    boolean success = false;
+    AtomicFileOutputStream fos = new AtomicFileOutputStream(f);
+    try {
+      newData.writeDelimitedTo(fos);
+      fos.write('\n');
+      // Write human-readable data after the protobuf. This is only
+      // to assist in debugging -- it's not parsed at all.
+      OutputStreamWriter writer = new OutputStreamWriter(fos);
+      
+      writer.write(String.valueOf(newData));
+      writer.write('\n');
+      writer.flush();
+      
+      fos.flush();
+      success = true;
+    } finally {
+      if (success) {
+        IOUtils.closeStream(fos);
+      } else {
+        fos.abort();
+      }
+    }
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * The JournalNode is a daemon which allows namenodes using
+ * the QuorumJournalManager to log and retrieve edits stored
+ * remotely. It is a thin wrapper around a local edit log
+ * directory with the addition of facilities to participate
+ * in the quorum protocol.
+ */
+@InterfaceAudience.Private
+public class JournalNode implements Tool, Configurable {
+  public static final Log LOG = LogFactory.getLog(JournalNode.class);
+  private Configuration conf;
+  private JournalNodeRpcServer rpcServer;
+  private JournalNodeHttpServer httpServer;
+  private Map<String, Journal> journalsById = Maps.newHashMap();
+
+  static {
+    HdfsConfiguration.init();
+  }
+  
+  /**
+   * When stopped, the daemon will exit with this code. 
+   */
+  private int resultCode = 0;
+
+  synchronized Journal getOrCreateJournal(String jid) {
+    QuorumJournalManager.checkJournalId(jid);
+    
+    Journal journal = journalsById.get(jid);
+    if (journal == null) {
+      File logDir = getLogDir(jid);
+      LOG.info("Initializing journal in directory " + logDir);      
+      journal = new Journal(logDir, new ErrorReporter());
+      journalsById.put(jid, journal);
+    }
+    
+    return journal;
+  }
+
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    start();
+    return join();
+  }
+
+  /**
+   * Start listening for edits via RPC.
+   */
+  public void start() throws IOException {
+    Preconditions.checkState(!isStarted(), "JN already running");
+    
+    DefaultMetricsSystem.initialize("JournalNode");
+    JvmMetrics.create("JournalNode",
+        conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
+        DefaultMetricsSystem.instance());
+    
+    httpServer = new JournalNodeHttpServer(conf, this);
+    httpServer.start();
+
+    rpcServer = new JournalNodeRpcServer(conf, this);
+    rpcServer.start();
+  }
+
+  public boolean isStarted() {
+    return rpcServer != null;
+  }
+
+  /**
+   * @return the address the IPC server is bound to
+   */
+  public InetSocketAddress getBoundIpcAddress() {
+    return rpcServer.getAddress();
+  }
+  
+
+  public InetSocketAddress getBoundHttpAddress() {
+    return httpServer.getAddress();
+  }
+
+
+  /**
+   * Stop the daemon with the given status code
+   * @param rc the status code with which to exit (non-zero
+   * should indicate an error)
+   */
+  public void stop(int rc) {
+    this.resultCode = rc;
+    
+    if (rpcServer != null) { 
+      rpcServer.stop();
+    }
+
+    if (httpServer != null) {
+      try {
+        httpServer.stop();
+      } catch (IOException ioe) {
+        LOG.warn("Unable to stop HTTP server for " + this, ioe);
+      }
+    }
+    
+    for (Journal j : journalsById.values()) {
+      IOUtils.cleanup(LOG, j);
+    }
+  }
+
+  /**
+   * Wait for the daemon to exit.
+   * @return the result code (non-zero if error)
+   */
+  int join() throws InterruptedException {
+    if (rpcServer != null) {
+      rpcServer.join();
+    }
+    return resultCode;
+  }
+  
+  public void stopAndJoin(int rc) throws InterruptedException {
+    stop(rc);
+    join();
+  }
+
+  /**
+   * Return the directory inside our configured storage
+   * dir which corresponds to a given journal. 
+   * @param jid the journal identifier
+   * @return the file, which may or may not exist yet
+   */
+  private File getLogDir(String jid) {
+    String dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
+        DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT);
+    Preconditions.checkArgument(jid != null &&
+        !jid.isEmpty(),
+        "bad journal identifier: %s", jid);
+    return new File(new File(dir), jid);
+  }
+
+  
+  private class ErrorReporter implements StorageErrorReporter {
+    @Override
+    public void reportErrorOnFile(File f) {
+      LOG.fatal("Error reported on file " + f + "... exiting",
+          new Exception());
+      stop(1);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(new JournalNode(), args));
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import javax.servlet.ServletContext;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+/**
+ * Encapsulates the HTTP server started by the Journal Service.
+ */
+@InterfaceAudience.Private
+public class JournalNodeHttpServer {
+  public static final Log LOG = LogFactory.getLog(
+      JournalNodeHttpServer.class);
+
+  public static final String JN_ATTRIBUTE_KEY = "localjournal";
+
+  private HttpServer httpServer;
+  private int infoPort;
+  private JournalNode localJournalNode;
+
+  private final Configuration conf;
+
+  JournalNodeHttpServer(Configuration conf, JournalNode jn) {
+    this.conf = conf;
+    this.localJournalNode = jn;
+  }
+
+  void start() throws IOException {
+    final InetSocketAddress bindAddr = getAddress(conf);
+
+    // initialize the webserver for uploading/downloading files.
+    LOG.info("Starting web server as: "
+        + UserGroupInformation.getCurrentUser().getUserName());
+
+    int tmpInfoPort = bindAddr.getPort();
+    httpServer = new HttpServer("journal", bindAddr.getHostName(),
+        tmpInfoPort, tmpInfoPort == 0, conf, new AccessControlList(conf
+            .get(DFS_ADMIN, " "))) {
+      {
+        if (UserGroupInformation.isSecurityEnabled()) {
+          initSpnego(conf, DFS_JOURNALNODE_USER_NAME_KEY,
+              DFS_JOURNALNODE_KEYTAB_FILE_KEY);
+        }
+      }
+    };
+    httpServer.setAttribute(JN_ATTRIBUTE_KEY, localJournalNode);
+    httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
+    httpServer.addInternalServlet("getJournal", "/getJournal",
+        GetJournalEditServlet.class, true);
+    httpServer.start();
+
+    // The web-server port can be ephemeral... ensure we have the correct info
+    infoPort = httpServer.getPort();
+
+    LOG.info("Journal Web-server up at: " + bindAddr + ":" + infoPort);
+  }
+
+  void stop() throws IOException {
+    if (httpServer != null) {
+      try {
+        httpServer.stop();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+  }
+  
+  /**
+   * Return the actual address bound to by the running server.
+   */
+  public InetSocketAddress getAddress() {
+    InetSocketAddress addr = httpServer.getListenerAddress();
+    assert addr.getPort() != 0;
+    return addr;
+  }
+
+  private static InetSocketAddress getAddress(Configuration conf) {
+    String addr = conf.get(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY,
+        DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT);
+    return NetUtils.createSocketAddr(addr,
+        DFSConfigKeys.DFS_JOURNALNODE_HTTP_PORT_DEFAULT,
+        DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY);
+  }
+
+  public static Journal getJournalFromContext(ServletContext context, String jid) {
+    JournalNode jn = (JournalNode)context.getAttribute(JN_ATTRIBUTE_KEY);
+    return jn.getOrCreateJournal(jid);
+  }
+
+  public static Configuration getConfFromContext(ServletContext context) {
+    return (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.QJournalProtocolService;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.net.NetUtils;
+
+import com.google.protobuf.BlockingService;
+
+class JournalNodeRpcServer implements QJournalProtocol {
+
+  private static final int HANDLER_COUNT = 5;
+  private JournalNode jn;
+  private Server server;
+
+  JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
+    this.jn = jn;
+    
+    InetSocketAddress addr = getAddress(conf);
+    RPC.setProtocolEngine(conf, QJournalProtocolPB.class,
+        ProtobufRpcEngine.class);
+    QJournalProtocolServerSideTranslatorPB translator =
+        new QJournalProtocolServerSideTranslatorPB(this);
+    BlockingService service = QJournalProtocolService
+        .newReflectiveBlockingService(translator);
+    
+    this.server = RPC.getServer(
+        QJournalProtocolPB.class,
+        service, addr.getHostName(),
+            addr.getPort(), HANDLER_COUNT, false, conf,
+            null /*secretManager*/);
+  }
+
+  void start() {
+    this.server.start();
+  }
+
+  public InetSocketAddress getAddress() {
+    return server.getListenerAddress();
+  }
+  
+  void join() throws InterruptedException {
+    this.server.join();
+  }
+  
+  void stop() {
+    this.server.stop();
+  }
+  
+  private static InetSocketAddress getAddress(Configuration conf) {
+    String addr = conf.get(
+        DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
+        DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT);
+    return NetUtils.createSocketAddr(addr, 0,
+        DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY);
+  }
+
+  @Override
+  public GetJournalStateResponseProto getJournalState(String journalId)
+        throws IOException {
+    long epoch = jn.getOrCreateJournal(journalId).getLastPromisedEpoch(); 
+    return GetJournalStateResponseProto.newBuilder()
+        .setLastPromisedEpoch(epoch)
+        .setHttpPort(jn.getBoundHttpAddress().getPort())
+        .build();
+  }
+
+  @Override
+  public NewEpochResponseProto newEpoch(String journalId,
+      NamespaceInfo nsInfo,
+      long epoch) throws IOException {
+    return jn.getOrCreateJournal(journalId).newEpoch(nsInfo, epoch);
+  }
+
+
+  @Override
+  public void journal(RequestInfo reqInfo, long firstTxnId,
+      int numTxns, byte[] records) throws IOException {
+    jn.getOrCreateJournal(reqInfo.getJournalId())
+       .journal(reqInfo, firstTxnId, numTxns, records);
+  }
+
+  @Override
+  public void startLogSegment(RequestInfo reqInfo, long txid)
+      throws IOException {
+    jn.getOrCreateJournal(reqInfo.getJournalId())
+      .startLogSegment(reqInfo, txid);
+  }
+
+  @Override
+  public void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
+      long endTxId) throws IOException {
+    jn.getOrCreateJournal(reqInfo.getJournalId())
+      .finalizeLogSegment(reqInfo, startTxId, endTxId);
+  }
+
+  @Override
+  public GetEditLogManifestResponseProto getEditLogManifest(String jid,
+      long sinceTxId) throws IOException {
+    
+    RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
+        .getEditLogManifest(sinceTxId);
+    
+    return GetEditLogManifestResponseProto.newBuilder()
+        .setManifest(PBHelper.convert(manifest))
+        .setHttpPort(jn.getBoundHttpAddress().getPort())
+        .build();
+  }
+
+  @Override
+  public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
+      long segmentTxId) throws IOException {
+    return jn.getOrCreateJournal(reqInfo.getJournalId())
+        .prepareRecovery(reqInfo, segmentTxId);
+  }
+
+  @Override
+  public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto log,
+      URL fromUrl) throws IOException {
+    jn.getOrCreateJournal(reqInfo.getJournalId())
+        .acceptRecovery(reqInfo, log, fromUrl);
+  }
+
+}

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Fri Jul 20 00:25:50 2012
@@ -39,7 +39,8 @@ public final class HdfsServerConstants {
    */
   static public enum NodeType {
     NAME_NODE,
-    DATA_NODE;
+    DATA_NODE,
+    JOURNAL_NODE;
   }
 
   /** Startup options */

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Fri Jul 20 00:25:50 2012
@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.util.VersionInfo;
 
+import com.google.common.base.Preconditions;
+
 
 
 /**
@@ -75,7 +77,7 @@ public abstract class Storage extends St
   /** Layout versions of 0.20.203 release */
   public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
 
-  private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
+  public    static final String STORAGE_FILE_LOCK     = "in_use.lock";
   protected static final String STORAGE_FILE_VERSION  = "VERSION";
   public    static final String STORAGE_DIR_CURRENT   = "current";
   public    static final String STORAGE_DIR_PREVIOUS  = "previous";
@@ -719,6 +721,15 @@ public abstract class Storage extends St
     return storageDirs.get(idx);
   }
   
+  /**
+   * @return the storage directory, with the precondition that this storage
+   * has exactly one storage directory
+   */
+  public StorageDirectory getSingularStorageDir() {
+    Preconditions.checkState(storageDirs.size() == 1);
+    return storageDirs.get(0);
+  }
+  
   protected void addStorageDir(StorageDirectory sd) {
     storageDirs.add(sd);
   }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java Fri Jul 20 00:25:50 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -34,7 +35,8 @@ import com.google.common.base.Preconditi
  * to progress concurrently to flushes without allocating new buffers each
  * time.
  */
-class EditsDoubleBuffer {
+@InterfaceAudience.Private
+public class EditsDoubleBuffer {
 
   private TxnBuffer bufCurrent; // current buffer for writing
   private TxnBuffer bufReady; // buffer ready for flushing
@@ -51,11 +53,11 @@ class EditsDoubleBuffer {
     bufCurrent.writeOp(op);
   }
 
-  void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+  public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
     bufCurrent.write(bytes, offset, length);
   }
   
-  void close() throws IOException {
+  public void close() throws IOException {
     Preconditions.checkNotNull(bufCurrent);
     Preconditions.checkNotNull(bufReady);
 
@@ -69,7 +71,7 @@ class EditsDoubleBuffer {
     bufCurrent = bufReady = null;
   }
   
-  void setReadyToFlush() {
+  public void setReadyToFlush() {
     assert isFlushed() : "previous data not flushed yet";
     TxnBuffer tmp = bufReady;
     bufReady = bufCurrent;
@@ -80,12 +82,12 @@ class EditsDoubleBuffer {
    * Writes the content of the "ready" buffer to the given output stream,
    * and resets it. Does not swap any buffers.
    */
-  void flushTo(OutputStream out) throws IOException {
+  public void flushTo(OutputStream out) throws IOException {
     bufReady.writeTo(out); // write data to file
     bufReady.reset(); // erase all data in the buffer
   }
   
-  boolean shouldForceSync() {
+  public boolean shouldForceSync() {
     return bufCurrent.size() >= initBufferSize;
   }
 
@@ -120,6 +122,12 @@ class EditsDoubleBuffer {
     return bufReady.numTxns;
   }
 
+  /**
+   * @return the number of bytes that are ready to be flushed
+   */
+  public int countReadyBytes() {
+    return bufReady.size();
+  }
   
   private static class TxnBuffer extends DataOutputBuffer {
     long firstTxId;

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Jul 20 00:25:50 2012
@@ -1136,6 +1136,7 @@ public class FSEditLog  {
       journalSet.recoverUnfinalizedSegments();
     } catch (IOException ex) {
       // All journals have failed, it is handled in logSync.
+      // TODO: are we sure this is OK?
     }
   }
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Jul 20 00:25:50 2012
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.ComparisonChain;
@@ -50,7 +52,8 @@ import com.google.common.collect.Compari
  * Note: this class is not thread-safe and should be externally
  * synchronized.
  */
-class FileJournalManager implements JournalManager {
+@InterfaceAudience.Private
+public class FileJournalManager implements JournalManager {
   private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
 
   private final StorageDirectory sd;
@@ -147,7 +150,7 @@ class FileJournalManager implements Jour
    * @return a list of remote edit logs
    * @throws IOException if edit logs cannot be listed.
    */
-  List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
+  public List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
     File currentDir = sd.getCurrentDir();
     List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
     List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
@@ -165,6 +168,8 @@ class FileJournalManager implements Jour
       }
     }
     
+    Collections.sort(ret);
+    
     return ret;
   }
 
@@ -178,7 +183,7 @@ class FileJournalManager implements Jour
    * @throws IOException
    *           IOException thrown for invalid logDir
    */
-  static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
+  public static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
     return matchEditLogs(FileUtil.listFiles(logDir));
   }
   
@@ -206,7 +211,7 @@ class FileJournalManager implements Jour
         try {
           long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
           ret.add(
-              new EditLogFile(f, startTxId, startTxId, true));
+              new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true));
         } catch (NumberFormatException nfe) {
           LOG.error("In-progress edits file " + f + " has improperly " +
                     "formatted transaction ID");
@@ -304,7 +309,7 @@ class FileJournalManager implements Jour
     }
   }
 
-  List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
+  public List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
     File currentDir = sd.getCurrentDir();
     List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
     List<EditLogFile> logFiles = Lists.newArrayList();
@@ -320,6 +325,32 @@ class FileJournalManager implements Jour
 
     return logFiles;
   }
+  
+  public EditLogFile getLogFile(long startTxId) throws IOException {
+    return getLogFile(sd.getCurrentDir(), startTxId);
+  }
+  
+  public static EditLogFile getLogFile(File dir, long startTxId)
+      throws IOException {
+    List<EditLogFile> files = matchEditLogs(dir);
+    List<EditLogFile> ret = Lists.newLinkedList();
+    for (EditLogFile elf : files) {
+      if (elf.getFirstTxId() == startTxId) {
+        ret.add(elf);
+      }
+    }
+    
+    if (ret.isEmpty()) {
+      // no matches
+      return null;
+    } else if (ret.size() == 1) {
+      return ret.get(0);
+    } else {
+      throw new IllegalStateException("More than one log segment in " + 
+          dir + " starting at txid " + startTxId + ": " +
+          Joiner.on(", ").join(ret));
+    }
+  }
 
   @Override
   public String toString() {
@@ -329,7 +360,8 @@ class FileJournalManager implements Jour
   /**
    * Record of an edit log that has been located and had its filename parsed.
    */
-  static class EditLogFile {
+  @InterfaceAudience.Private
+  public static class EditLogFile {
     private File file;
     private final long firstTxId;
     private long lastTxId;
@@ -362,17 +394,20 @@ class FileJournalManager implements Jour
       assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID);
       assert file != null;
       
+      Preconditions.checkArgument(!isInProgress ||
+          lastTxId == HdfsConstants.INVALID_TXID);
+      
       this.firstTxId = firstTxId;
       this.lastTxId = lastTxId;
       this.file = file;
       this.isInProgress = isInProgress;
     }
     
-    long getFirstTxId() {
+    public long getFirstTxId() {
       return firstTxId;
     }
     
-    long getLastTxId() {
+    public long getLastTxId() {
       return lastTxId;
     }
     
@@ -385,17 +420,17 @@ class FileJournalManager implements Jour
      * This will update the lastTxId of the EditLogFile or
      * mark it as corrupt if it is.
      */
-    void validateLog() throws IOException {
+    public void validateLog() throws IOException {
       EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
       this.lastTxId = val.getEndTxId();
       this.hasCorruptHeader = val.hasCorruptHeader();
     }
 
-    boolean isInProgress() {
+    public boolean isInProgress() {
       return isInProgress;
     }
 
-    File getFile() {
+    public File getFile() {
       return file;
     }
     

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java Fri Jul 20 00:25:50 2012
@@ -229,6 +229,13 @@ public class JournalSet implements Journ
       }
       jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
     }
+    chainAndMakeRedundantStreams(streams, allStreams, fromTxId, inProgressOk);
+  }
+  
+  public static void chainAndMakeRedundantStreams(
+      Collection<EditLogInputStream> outStreams,
+      PriorityQueue<EditLogInputStream> allStreams,
+      long fromTxId, boolean inProgressOk) {
     // We want to group together all the streams that start on the same start
     // transaction ID.  To do this, we maintain an accumulator (acc) of all
     // the streams we've seen at a given start transaction ID.  When we see a
@@ -246,7 +253,7 @@ public class JournalSet implements Journ
         if (accFirstTxId == elis.getFirstTxId()) {
           acc.add(elis);
         } else if (accFirstTxId < elis.getFirstTxId()) {
-          streams.add(new RedundantEditLogInputStream(acc, fromTxId));
+          outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
           acc.clear();
           acc.add(elis);
         } else if (accFirstTxId > elis.getFirstTxId()) {
@@ -257,7 +264,7 @@ public class JournalSet implements Journ
       }
     }
     if (!acc.isEmpty()) {
-      streams.add(new RedundantEditLogInputStream(acc, fromTxId));
+      outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
       acc.clear();
     }
   }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java Fri Jul 20 00:25:50 2012
@@ -41,6 +41,14 @@ final class NameNodeResourcePolicy {
   static boolean areResourcesAvailable(
       Collection<? extends CheckableNameNodeResource> resources,
       int minimumRedundantResources) {
+
+    // TODO: workaround:
+    // - during startup, if there are no edits dirs on disk, then there is
+    // a call to areResourcesAvailable() with no dirs at all, which was
+    // previously causing the NN to enter safemode
+    if (resources.isEmpty()) {
+      return true;
+    }
     
     int requiredResourceCount = 0;
     int redundantResourceCount = 0;