You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/12/05 20:22:25 UTC

svn commit: r1417596 [2/6] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java...

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.Message;
+import com.google.protobuf.TextFormat;
+
+/**
+ * Represents a set of calls for which a quorum of results is needed.
+ * @param <KEY> a key used to identify each of the outgoing calls
+ * @param <RESULT> the type of the call result
+ */
+class QuorumCall<KEY, RESULT> {
+  private final Map<KEY, RESULT> successes = Maps.newHashMap();
+  private final Map<KEY, Throwable> exceptions = Maps.newHashMap();
+
+  /**
+   * Interval, in milliseconds, at which a log message will be made
+   * while waiting for a quorum call.
+   */
+  private static final int WAIT_PROGRESS_INTERVAL_MILLIS = 1000;
+  
+  /**
+   * Start logging messages at INFO level periodically after waiting for
+   * this fraction of the configured timeout for any call.
+   */
+  private static final float WAIT_PROGRESS_INFO_THRESHOLD = 0.3f;
+  /**
+   * Start logging messages at WARN level after waiting for this
+   * fraction of the configured timeout for any call.
+   */
+  private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
+  
+  static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
+      Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
+    final QuorumCall<KEY, RESULT> qr = new QuorumCall<KEY, RESULT>();
+    for (final Entry<KEY, ? extends ListenableFuture<RESULT>> e : calls.entrySet()) {
+      Preconditions.checkArgument(e.getValue() != null,
+          "null future for key: " + e.getKey());
+      Futures.addCallback(e.getValue(), new FutureCallback<RESULT>() {
+        @Override
+        public void onFailure(Throwable t) {
+          qr.addException(e.getKey(), t);
+        }
+
+        @Override
+        public void onSuccess(RESULT res) {
+          qr.addResult(e.getKey(), res);
+        }
+      });
+    }
+    return qr;
+  }
+  
+  private QuorumCall() {
+    // Only instantiated from factory method above
+  }
+  
+  /**
+   * Wait for the quorum to achieve a certain number of responses.
+   * 
+   * Note that, even after this returns, more responses may arrive,
+   * causing the return value of other methods in this class to change.
+   *
+   * @param minResponses return as soon as this many responses have been
+   * received, regardless of whether they are successes or exceptions
+   * @param minSuccesses return as soon as this many successful (non-exception)
+   * responses have been received
+   * @param maxExceptions return as soon as this many exception responses
+   * have been received. Pass 0 to return immediately if any exception is
+   * received.
+   * @param millis the number of milliseconds to wait for
+   * @throws InterruptedException if the thread is interrupted while waiting
+   * @throws TimeoutException if the specified timeout elapses before
+   * achieving the desired conditions
+   */
+  public synchronized void waitFor(
+      int minResponses, int minSuccesses, int maxExceptions,
+      int millis, String operationName)
+      throws InterruptedException, TimeoutException {
+    long st = Time.monotonicNow();
+    long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
+    long et = st + millis;
+    while (true) {
+      checkAssertionErrors();
+      if (minResponses > 0 && countResponses() >= minResponses) return;
+      if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
+      if (maxExceptions >= 0 && countExceptions() > maxExceptions) return;
+      long now = Time.monotonicNow();
+      
+      if (now > nextLogTime) {
+        long waited = now - st;
+        String msg = String.format(
+            "Waited %s ms (timeout=%s ms) for a response for %s",
+            waited, millis, operationName);
+        if (waited > millis * WAIT_PROGRESS_WARN_THRESHOLD) {
+          QuorumJournalManager.LOG.warn(msg);
+        } else {
+          QuorumJournalManager.LOG.info(msg);
+        }
+        nextLogTime = now + WAIT_PROGRESS_INTERVAL_MILLIS;
+      }
+      long rem = et - now;
+      if (rem <= 0) {
+        throw new TimeoutException();
+      }
+      rem = Math.min(rem, nextLogTime - now);
+      rem = Math.max(rem, 1);
+      wait(rem);
+    }
+  }
+
+  /**
+   * Check if any of the responses came back with an AssertionError.
+   * If so, it re-throws it, even if there was a quorum of responses.
+   * This code only runs if assertions are enabled for this class,
+   * otherwise it should JIT itself away.
+   * 
+   * This is done since AssertionError indicates programmer confusion
+   * rather than some kind of expected issue, and thus in the context
+   * of test cases we'd like to actually fail the test case instead of
+   * continuing through.
+   */
+  private synchronized void checkAssertionErrors() {
+    boolean assertsEnabled = false;
+    assert assertsEnabled = true; // sets to true if enabled
+    if (assertsEnabled) {
+      for (Throwable t : exceptions.values()) {
+        if (t instanceof AssertionError) {
+          throw (AssertionError)t;
+        } else if (t instanceof RemoteException &&
+            ((RemoteException)t).getClassName().equals(
+                AssertionError.class.getName())) {
+          throw new AssertionError(t);
+        }
+      }
+    }
+  }
+
+  private synchronized void addResult(KEY k, RESULT res) {
+    successes.put(k, res);
+    notifyAll();
+  }
+  
+  private synchronized void addException(KEY k, Throwable t) {
+    exceptions.put(k, t);
+    notifyAll();
+  }
+  
+  /**
+   * @return the total number of calls for which a response has been received,
+   * regardless of whether it threw an exception or returned a successful
+   * result.
+   */
+  public synchronized int countResponses() {
+    return successes.size() + exceptions.size();
+  }
+  
+  /**
+   * @return the number of calls for which a non-exception response has been
+   * received.
+   */
+  public synchronized int countSuccesses() {
+    return successes.size();
+  }
+  
+  /**
+   * @return the number of calls for which an exception response has been
+   * received.
+   */
+  public synchronized int countExceptions() {
+    return exceptions.size();
+  }
+
+  /**
+   * @return the map of successful responses. A copy is made such that this
+   * map will not be further mutated, even if further results arrive for the
+   * quorum.
+   */
+  public synchronized Map<KEY, RESULT> getResults() {
+    return Maps.newHashMap(successes);
+  }
+
+  public synchronized void rethrowException(String msg) throws QuorumException {
+    Preconditions.checkState(!exceptions.isEmpty());
+    throw QuorumException.create(msg, successes, exceptions);
+  }
+
+  public static <K> String mapToString(
+      Map<K, ? extends Message> map) {
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for (Map.Entry<K, ? extends Message> e : map.entrySet()) {
+      if (!first) {
+        sb.append("\n");
+      }
+      first = false;
+      sb.append(e.getKey()).append(": ")
+        .append(TextFormat.shortDebugString(e.getValue()));
+    }
+    return sb.toString();
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+/**
+ * Exception thrown when too many exceptions occur while gathering
+ * responses to a quorum call. 
+ */
+class QuorumException extends IOException {
+
+  /**
+   * Create a QuorumException instance with a descriptive message detailing
+   * the underlying exceptions, as well as any successful responses which
+   * were returned.
+   * @param <K> the keys for the quorum calls
+   * @param <V> the success response type
+   * @param successes any successful responses returned
+   * @param exceptions the exceptions returned
+   */
+  public static <K, V> QuorumException create(
+      String simpleMsg,
+      Map<K, V> successes,
+      Map<K, Throwable> exceptions) {
+    Preconditions.checkArgument(!exceptions.isEmpty(),
+        "Must pass exceptions");
+    
+    StringBuilder msg = new StringBuilder();
+    msg.append(simpleMsg).append(". ");
+    if (!successes.isEmpty()) {
+      msg.append(successes.size()).append(" successful responses:\n");
+      
+      Joiner.on("\n")
+          .useForNull("null [success]")
+          .withKeyValueSeparator(": ")
+          .appendTo(msg, successes);
+      msg.append("\n");
+    }
+    
+    msg.append(exceptions.size() + " exceptions thrown:\n");
+    boolean isFirst = true;
+    
+    for (Map.Entry<K, Throwable> e : exceptions.entrySet()) {
+      if (!isFirst) {
+        msg.append("\n");
+      }
+      isFirst = false;
+      
+      msg.append(e.getKey()).append(": ");
+      
+      if (e.getValue() instanceof RuntimeException) {
+        msg.append(StringUtils.stringifyException(e.getValue()));
+      } else if (e.getValue().getLocalizedMessage() != null) {
+        msg.append(e.getValue().getLocalizedMessage());
+      } else {
+        msg.append(StringUtils.stringifyException(e.getValue()));
+      }
+    }
+    return new QuorumException(msg.toString());
+  }
+
+  private QuorumException(String msg) {
+    super(msg);
+  }
+
+  private static final long serialVersionUID = 1L;
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.TextFormat;
+
+/**
+ * A JournalManager that writes to a set of remote JournalNodes,
+ * requiring a quorum of nodes to ack each write.
+ */
+@InterfaceAudience.Private
+public class QuorumJournalManager implements JournalManager {
+  static final Log LOG = LogFactory.getLog(QuorumJournalManager.class);
+
+  // Timeouts for which the QJM will wait for each of the following actions.
+  private final int startSegmentTimeoutMs;
+  private final int prepareRecoveryTimeoutMs;
+  private final int acceptRecoveryTimeoutMs;
+  private final int finalizeSegmentTimeoutMs;
+  private final int selectInputStreamsTimeoutMs;
+  private final int getJournalStateTimeoutMs;
+  private final int newEpochTimeoutMs;
+  private final int writeTxnsTimeoutMs;
+
+  // Since these don't occur during normal operation, we can
+  // use rather lengthy timeouts, and don't need to make them
+  // configurable.
+  private static final int FORMAT_TIMEOUT_MS = 60000;
+  private static final int HASDATA_TIMEOUT_MS = 60000;
+  
+  private final Configuration conf;
+  private final URI uri;
+  private final NamespaceInfo nsInfo;
+  private boolean isActiveWriter;
+  
+  private final AsyncLoggerSet loggers;
+
+  private int outputBufferCapacity = 512 * 1024;
+  
+  public QuorumJournalManager(Configuration conf,
+      URI uri, NamespaceInfo nsInfo) throws IOException {
+    this(conf, uri, nsInfo, IPCLoggerChannel.FACTORY);
+  }
+  
+  QuorumJournalManager(Configuration conf,
+      URI uri, NamespaceInfo nsInfo,
+      AsyncLogger.Factory loggerFactory) throws IOException {
+    Preconditions.checkArgument(conf != null, "must be configured");
+
+    this.conf = conf;
+    this.uri = uri;
+    this.nsInfo = nsInfo;
+    this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
+
+    // Configure timeouts.
+    this.startSegmentTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT);
+    this.prepareRecoveryTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT);
+    this.acceptRecoveryTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT);
+    this.finalizeSegmentTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT);
+    this.selectInputStreamsTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT);
+    this.getJournalStateTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT);
+    this.newEpochTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT);
+    this.writeTxnsTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT);
+  }
+  
+  protected List<AsyncLogger> createLoggers(
+      AsyncLogger.Factory factory) throws IOException {
+    return createLoggers(conf, uri, nsInfo, factory);
+  }
+
+  static String parseJournalId(URI uri) {
+    String path = uri.getPath();
+    Preconditions.checkArgument(path != null && !path.isEmpty(),
+        "Bad URI '%s': must identify journal in path component",
+        uri);
+    String journalId = path.substring(1);
+    checkJournalId(journalId);
+    return journalId;
+  }
+  
+  public static void checkJournalId(String jid) {
+    Preconditions.checkArgument(jid != null &&
+        !jid.isEmpty() &&
+        !jid.contains("/") &&
+        !jid.startsWith("."),
+        "bad journal id: " + jid);
+  }
+
+  
+  /**
+   * Fence any previous writers, and obtain a unique epoch number
+   * for write-access to the journal nodes.
+   *
+   * @return the new, unique epoch number
+   */
+  Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
+      throws IOException {
+    Preconditions.checkState(!loggers.isEpochEstablished(),
+        "epoch already created");
+    
+    Map<AsyncLogger, GetJournalStateResponseProto> lastPromises =
+      loggers.waitForWriteQuorum(loggers.getJournalState(),
+          getJournalStateTimeoutMs, "getJournalState()");
+    
+    long maxPromised = Long.MIN_VALUE;
+    for (GetJournalStateResponseProto resp : lastPromises.values()) {
+      maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch());
+    }
+    assert maxPromised >= 0;
+    
+    long myEpoch = maxPromised + 1;
+    Map<AsyncLogger, NewEpochResponseProto> resps =
+        loggers.waitForWriteQuorum(loggers.newEpoch(nsInfo, myEpoch),
+            newEpochTimeoutMs, "newEpoch(" + myEpoch + ")");
+        
+    loggers.setEpoch(myEpoch);
+    return resps;
+  }
+  
+  @Override
+  public void format(NamespaceInfo nsInfo) throws IOException {
+    QuorumCall<AsyncLogger,Void> call = loggers.format(nsInfo);
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS,
+          "format");
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for format() response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for format() response");
+    }
+    
+    if (call.countExceptions() > 0) {
+      call.rethrowException("Could not format one or more JournalNodes");
+    }
+  }
+
+  @Override
+  public boolean hasSomeData() throws IOException {
+    QuorumCall<AsyncLogger, Boolean> call =
+        loggers.isFormatted();
+
+    try {
+      call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS, "hasSomeData");
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while determining if JNs have data");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for response from loggers");
+    }
+    
+    if (call.countExceptions() > 0) {
+      call.rethrowException(
+          "Unable to check if JNs are ready for formatting");
+    }
+    
+    // If any of the loggers returned with a non-empty manifest, then
+    // we should prompt for format.
+    for (Boolean hasData : call.getResults().values()) {
+      if (hasData) {
+        return true;
+      }
+    }
+
+    // Otherwise, none were formatted, we can safely format.
+    return false;
+  }
+
+  /**
+   * Run recovery/synchronization for a specific segment.
+   * Postconditions:
+   * <ul>
+   * <li>This segment will be finalized on a majority
+   * of nodes.</li>
+   * <li>All nodes which contain the finalized segment will
+   * agree on the length.</li>
+   * </ul>
+   * 
+   * @param segmentTxId the starting txid of the segment
+   * @throws IOException
+   */
+  private void recoverUnclosedSegment(long segmentTxId) throws IOException {
+    Preconditions.checkArgument(segmentTxId > 0);
+    LOG.info("Beginning recovery of unclosed segment starting at txid " +
+        segmentTxId);
+    
+    // Step 1. Prepare recovery
+    QuorumCall<AsyncLogger,PrepareRecoveryResponseProto> prepare =
+        loggers.prepareRecovery(segmentTxId);
+    Map<AsyncLogger, PrepareRecoveryResponseProto> prepareResponses=
+        loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs,
+            "prepareRecovery(" + segmentTxId + ")");
+    LOG.info("Recovery prepare phase complete. Responses:\n" +
+        QuorumCall.mapToString(prepareResponses));
+
+    // Determine the logger who either:
+    // a) Has already accepted a previous proposal that's higher than any
+    //    other
+    //
+    //  OR, if no such logger exists:
+    //
+    // b) Has the longest log starting at this transaction ID
+    
+    // TODO: we should collect any "ties" and pass the URL for all of them
+    // when syncing, so we can tolerate failure during recovery better.
+    Entry<AsyncLogger, PrepareRecoveryResponseProto> bestEntry = Collections.max(
+        prepareResponses.entrySet(), SegmentRecoveryComparator.INSTANCE); 
+    AsyncLogger bestLogger = bestEntry.getKey();
+    PrepareRecoveryResponseProto bestResponse = bestEntry.getValue();
+    
+    // Log the above decision, check invariants.
+    if (bestResponse.hasAcceptedInEpoch()) {
+      LOG.info("Using already-accepted recovery for segment " +
+          "starting at txid " + segmentTxId + ": " +
+          bestEntry);
+    } else if (bestResponse.hasSegmentState()) {
+      LOG.info("Using longest log: " + bestEntry);
+    } else {
+      // None of the responses to prepareRecovery() had a segment at the given
+      // txid. This can happen for example in the following situation:
+      // - 3 JNs: JN1, JN2, JN3
+      // - writer starts segment 101 on JN1, then crashes before
+      //   writing to JN2 and JN3
+      // - during newEpoch(), we saw the segment on JN1 and decide to
+      //   recover segment 101
+      // - before prepare(), JN1 crashes, and we only talk to JN2 and JN3,
+      //   neither of which has any entry for this log.
+      // In this case, it is allowed to do nothing for recovery, since the
+      // segment wasn't started on a quorum of nodes.
+
+      // Sanity check: we should only get here if none of the responses had
+      // a log. This should be a postcondition of the recovery comparator,
+      // but a bug in the comparator might cause us to get here.
+      for (PrepareRecoveryResponseProto resp : prepareResponses.values()) {
+        assert !resp.hasSegmentState() :
+          "One of the loggers had a response, but no best logger " +
+          "was found.";
+      }
+
+      LOG.info("None of the responders had a log to recover: " +
+          QuorumCall.mapToString(prepareResponses));
+      return;
+    }
+    
+    SegmentStateProto logToSync = bestResponse.getSegmentState();
+    assert segmentTxId == logToSync.getStartTxId();
+    
+    // Sanity check: none of the loggers should be aware of a higher
+    // txid than the txid we intend to truncate to
+    for (Map.Entry<AsyncLogger, PrepareRecoveryResponseProto> e :
+         prepareResponses.entrySet()) {
+      AsyncLogger logger = e.getKey();
+      PrepareRecoveryResponseProto resp = e.getValue();
+
+      if (resp.hasLastCommittedTxId() &&
+          resp.getLastCommittedTxId() > logToSync.getEndTxId()) {
+        throw new AssertionError("Decided to synchronize log to " + logToSync +
+            " but logger " + logger + " had seen txid " +
+            resp.getLastCommittedTxId() + " committed");
+      }
+    }
+    
+    URL syncFromUrl = bestLogger.buildURLToFetchLogs(segmentTxId);
+    
+    QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl);
+    loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs,
+        "acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")");
+
+    // If one of the loggers above missed the synchronization step above, but
+    // we send a finalize() here, that's OK. It validates the log before
+    // finalizing. Hence, even if it is not "in sync", it won't incorrectly
+    // finalize.
+    QuorumCall<AsyncLogger, Void> finalize =
+        loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId()); 
+    loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs,
+        String.format("finalizeLogSegment(%s-%s)",
+            logToSync.getStartTxId(),
+            logToSync.getEndTxId()));
+  }
+  
+  static List<AsyncLogger> createLoggers(Configuration conf,
+      URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
+          throws IOException {
+    List<AsyncLogger> ret = Lists.newArrayList();
+    List<InetSocketAddress> addrs = getLoggerAddresses(uri);
+    String jid = parseJournalId(uri);
+    for (InetSocketAddress addr : addrs) {
+      ret.add(factory.createLogger(conf, nsInfo, jid, addr));
+    }
+    return ret;
+  }
+ 
+  private static List<InetSocketAddress> getLoggerAddresses(URI uri)
+      throws IOException {
+    String authority = uri.getAuthority();
+    Preconditions.checkArgument(authority != null && !authority.isEmpty(),
+        "URI has no authority: " + uri);
+    
+    String[] parts = StringUtils.split(authority, ';');
+    for (int i = 0; i < parts.length; i++) {
+      parts[i] = parts[i].trim();
+    }
+
+    if (parts.length % 2 == 0) {
+      LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
+          "of Journal Nodes specified. This is not recommended!");
+    }
+    
+    List<InetSocketAddress> addrs = Lists.newArrayList();
+    for (String addr : parts) {
+      addrs.add(NetUtils.createSocketAddr(
+          addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT));
+    }
+    return addrs;
+  }
+  
+  @Override
+  public EditLogOutputStream startLogSegment(long txId) throws IOException {
+    Preconditions.checkState(isActiveWriter,
+        "must recover segments before starting a new one");
+    QuorumCall<AsyncLogger,Void> q = loggers.startLogSegment(txId);
+    loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
+        "startLogSegment(" + txId + ")");
+    return new QuorumOutputStream(loggers, txId,
+        outputBufferCapacity, writeTxnsTimeoutMs);
+  }
+
+  @Override
+  public void finalizeLogSegment(long firstTxId, long lastTxId)
+      throws IOException {
+    QuorumCall<AsyncLogger,Void> q = loggers.finalizeLogSegment(
+        firstTxId, lastTxId);
+    loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs,
+        String.format("finalizeLogSegment(%s-%s)", firstTxId, lastTxId));
+  }
+
+  @Override
+  public void setOutputBufferCapacity(int size) {
+    outputBufferCapacity = size;
+  }
+
+  @Override
+  public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
+    // This purges asynchronously -- there's no need to wait for a quorum
+    // here, because it's always OK to fail.
+    LOG.info("Purging remote journals older than txid " + minTxIdToKeep);
+    loggers.purgeLogsOlderThan(minTxIdToKeep);
+  }
+
+  @Override
+  public void recoverUnfinalizedSegments() throws IOException {
+    Preconditions.checkState(!isActiveWriter, "already active writer");
+    
+    LOG.info("Starting recovery process for unclosed journal segments...");
+    Map<AsyncLogger, NewEpochResponseProto> resps = createNewUniqueEpoch();
+    LOG.info("Successfully started new epoch " + loggers.getEpoch());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("newEpoch(" + loggers.getEpoch() + ") responses:\n" +
+        QuorumCall.mapToString(resps));
+    }
+    
+    long mostRecentSegmentTxId = Long.MIN_VALUE;
+    for (NewEpochResponseProto r : resps.values()) {
+      if (r.hasLastSegmentTxId()) {
+        mostRecentSegmentTxId = Math.max(mostRecentSegmentTxId,
+            r.getLastSegmentTxId());
+      }
+    }
+    
+    // On a completely fresh system, none of the journals have any
+    // segments, so there's nothing to recover.
+    if (mostRecentSegmentTxId != Long.MIN_VALUE) {
+      recoverUnclosedSegment(mostRecentSegmentTxId);
+    }
+    isActiveWriter = true;
+  }
+
+  @Override
+  public void close() throws IOException {
+    loggers.close();
+  }
+
+  @Override
+  public void selectInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxnId, boolean inProgressOk) throws IOException {
+
+    QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
+        loggers.getEditLogManifest(fromTxnId);
+    Map<AsyncLogger, RemoteEditLogManifest> resps =
+        loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
+            "selectInputStreams");
+    
+    LOG.debug("selectInputStream manifests:\n" +
+        Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
+    
+    final PriorityQueue<EditLogInputStream> allStreams = 
+        new PriorityQueue<EditLogInputStream>(64,
+            JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+    for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
+      AsyncLogger logger = e.getKey();
+      RemoteEditLogManifest manifest = e.getValue();
+      
+      for (RemoteEditLog remoteLog : manifest.getLogs()) {
+        URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
+
+        EditLogInputStream elis = EditLogFileInputStream.fromUrl(
+            url, remoteLog.getStartTxId(), remoteLog.getEndTxId(),
+            remoteLog.isInProgress());
+        allStreams.add(elis);
+      }
+    }
+    JournalSet.chainAndMakeRedundantStreams(
+        streams, allStreams, fromTxnId, inProgressOk);
+  }
+  
+  @Override
+  public String toString() {
+    return "QJM to " + loggers;
+  }
+
+  @VisibleForTesting
+  AsyncLoggerSet getLoggerSetForTests() {
+    return loggers;
+  }
+
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditsDoubleBuffer;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+/**
+ * EditLogOutputStream implementation that writes to a quorum of
+ * remote journals.
+ */
+class QuorumOutputStream extends EditLogOutputStream {
+  private final AsyncLoggerSet loggers;
+  private EditsDoubleBuffer buf;
+  private final long segmentTxId;
+  private final int writeTimeoutMs;
+
+  public QuorumOutputStream(AsyncLoggerSet loggers,
+      long txId, int outputBufferCapacity,
+      int writeTimeoutMs) throws IOException {
+    super();
+    this.buf = new EditsDoubleBuffer(outputBufferCapacity);
+    this.loggers = loggers;
+    this.segmentTxId = txId;
+    this.writeTimeoutMs = writeTimeoutMs;
+  }
+
+  @Override
+  public void write(FSEditLogOp op) throws IOException {
+    buf.writeOp(op);
+  }
+
+  @Override
+  public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+    buf.writeRaw(bytes, offset, length);
+  }
+
+  @Override
+  public void create() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (buf != null) {
+      buf.close();
+      buf = null;
+    }
+  }
+
+  @Override
+  public void abort() throws IOException {
+    QuorumJournalManager.LOG.warn("Aborting " + this);
+    buf = null;
+    close();
+  }
+
+  @Override
+  public void setReadyToFlush() throws IOException {
+    buf.setReadyToFlush();
+  }
+
+  @Override
+  protected void flushAndSync(boolean durable) throws IOException {
+    int numReadyBytes = buf.countReadyBytes();
+    if (numReadyBytes > 0) {
+      int numReadyTxns = buf.countReadyTxns();
+      long firstTxToFlush = buf.getFirstReadyTxId();
+
+      assert numReadyTxns > 0;
+
+      // Copy from our double-buffer into a new byte array. This is for
+      // two reasons:
+      // 1) The IPC code has no way of specifying to send only a slice of
+      //    a larger array.
+      // 2) because the calls to the underlying nodes are asynchronous, we
+      //    need a defensive copy to avoid accidentally mutating the buffer
+      //    before it is sent.
+      DataOutputBuffer bufToSend = new DataOutputBuffer(numReadyBytes);
+      buf.flushTo(bufToSend);
+      assert bufToSend.getLength() == numReadyBytes;
+      byte[] data = bufToSend.getData();
+      assert data.length == bufToSend.getLength();
+
+      QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
+          segmentTxId, firstTxToFlush,
+          numReadyTxns, data);
+      loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits");
+      
+      // Since we successfully wrote this batch, let the loggers know. Any future
+      // RPCs will thus let the loggers know of the most recent transaction, even
+      // if a logger has fallen behind.
+      loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1);
+    }
+  }
+
+  @Override
+  public String generateHtmlReport() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Writing segment beginning at txid " + segmentTxId + "<br/>\n");
+    loggers.appendHtmlReport(sb);
+    return sb.toString();
+  }
+  
+  @Override
+  public String toString() {
+    return "QuorumOutputStream starting at txid " + segmentTxId;
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.util.Comparator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.primitives.Booleans;
+
+/**
+ * Compares responses to the prepareRecovery RPC. This is responsible for
+ * determining the correct length to recover.
+ */
+class SegmentRecoveryComparator
+    implements Comparator<Entry<AsyncLogger, PrepareRecoveryResponseProto>> {
+
+  static final SegmentRecoveryComparator INSTANCE = new SegmentRecoveryComparator();
+  
+  @Override
+  public int compare(
+      Entry<AsyncLogger, PrepareRecoveryResponseProto> a,
+      Entry<AsyncLogger, PrepareRecoveryResponseProto> b) {
+    
+    PrepareRecoveryResponseProto r1 = a.getValue();
+    PrepareRecoveryResponseProto r2 = b.getValue();
+    
+    // A response that has data for a segment is always better than one
+    // that doesn't.
+    if (r1.hasSegmentState() != r2.hasSegmentState()) {
+      return Booleans.compare(r1.hasSegmentState(), r2.hasSegmentState());
+    }
+    
+    if (!r1.hasSegmentState()) {
+      // Neither has a segment, so neither can be used for recover.
+      // Call them equal.
+      return 0;
+    }
+    
+    // They both have a segment.
+    SegmentStateProto r1Seg = r1.getSegmentState();
+    SegmentStateProto r2Seg = r2.getSegmentState();
+    
+    Preconditions.checkArgument(r1Seg.getStartTxId() == r2Seg.getStartTxId(),
+        "Should only be called with responses for corresponding segments: " +
+        "%s and %s do not have the same start txid.", r1, r2);
+
+    // If one is in-progress but the other is finalized,
+    // the finalized one is greater.
+    if (r1Seg.getIsInProgress() != r2Seg.getIsInProgress()) {
+      return Booleans.compare(!r1Seg.getIsInProgress(), !r2Seg.getIsInProgress());
+    }
+    
+    if (!r1Seg.getIsInProgress()) {
+      // If both are finalized, they should match lengths
+      if (r1Seg.getEndTxId() != r2Seg.getEndTxId()) {
+        throw new AssertionError("finalized segs with different lengths: " + 
+            r1 + ", " + r2);
+      }
+      return 0;
+    }
+    
+    // Both are in-progress.
+    long r1SeenEpoch = Math.max(r1.getAcceptedInEpoch(), r1.getLastWriterEpoch());
+    long r2SeenEpoch = Math.max(r2.getAcceptedInEpoch(), r2.getLastWriterEpoch());
+    
+    return ComparisonChain.start()
+        .compare(r1SeenEpoch, r2SeenEpoch)
+        .compare(r1.getSegmentState().getEndTxId(), r2.getSegmentState().getEndTxId())
+        .result();
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import java.io.IOException;
+
+/**
+ * Exception indicating that a call has been made to a JournalNode
+ * which is not yet formatted.
+ */
+@InterfaceAudience.Private
+public class JournalNotFormattedException extends IOException {
+  private static final long serialVersionUID = 1L;
+  
+  public JournalNotFormattedException(String msg) {
+    super(msg);
+  }
+
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalOutOfSyncException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalOutOfSyncException.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalOutOfSyncException.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalOutOfSyncException.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class JournalOutOfSyncException extends IOException {
+  private static final long serialVersionUID = 1L;
+  
+  public JournalOutOfSyncException(String msg) {
+    super(msg);
+  }
+
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used to communicate between {@link QuorumJournalManager}
+ * and each {@link JournalNode}.
+ * 
+ * This is responsible for sending edits as well as coordinating
+ * recovery of the nodes.
+ */
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY,
+    clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
+@InterfaceAudience.Private
+public interface QJournalProtocol {
+  public static final long versionID = 1L;
+
+  /**
+   * @return true if the given journal has been formatted and
+   * contains valid data.
+   */
+  public boolean isFormatted(String journalId) throws IOException;
+
+  /**
+   * Get the current state of the journal, including the most recent
+   * epoch number and the HTTP port.
+   */
+  public GetJournalStateResponseProto getJournalState(String journalId)
+      throws IOException;
+  
+  /**
+   * Format the underlying storage for the given namespace.
+   */
+  public void format(String journalId,
+      NamespaceInfo nsInfo) throws IOException;
+
+  /**
+   * Begin a new epoch. See the HDFS-3077 design doc for details.
+   */
+  public NewEpochResponseProto newEpoch(String journalId,
+      NamespaceInfo nsInfo, long epoch) throws IOException;
+  
+  /**
+   * Journal edit records.
+   * This message is sent by the active name-node to the JournalNodes
+   * to write edits to their local logs.
+   */
+  public void journal(RequestInfo reqInfo,
+                      long segmentTxId,
+                      long firstTxnId,
+                      int numTxns,
+                      byte[] records) throws IOException;
+
+  
+  /**
+   * Heartbeat.
+   * This is a no-op on the server, except that it verifies that the
+   * caller is in fact still the active writer, and provides up-to-date
+   * information on the most recently committed txid.
+   */
+  public void heartbeat(RequestInfo reqInfo) throws IOException;
+  
+  /**
+   * Start writing to a new log segment on the JournalNode.
+   * Before calling this, one should finalize the previous segment
+   * using {@link #finalizeLogSegment(RequestInfo, long, long)}.
+   * 
+   * @param txid the first txid in the new log
+   */
+  public void startLogSegment(RequestInfo reqInfo,
+      long txid) throws IOException;
+
+  /**
+   * Finalize the given log segment on the JournalNode. The segment
+   * is expected to be in-progress and starting at the given startTxId.
+   *
+   * @param startTxId the starting transaction ID of the log
+   * @param endTxId the expected last transaction in the given log
+   * @throws IOException if no such segment exists
+   */
+  public void finalizeLogSegment(RequestInfo reqInfo,
+      long startTxId, long endTxId) throws IOException;
+
+  /**
+   * @throws IOException 
+   * @see JournalManager#purgeLogsOlderThan(long)
+   */
+  public void purgeLogsOlderThan(RequestInfo requestInfo, long minTxIdToKeep)
+      throws IOException;
+  
+  /**
+   * @param jid the journal from which to enumerate edits
+   * @param sinceTxId the first transaction which the client cares about
+   * @return a list of edit log segments since the given transaction ID.
+   */
+  public GetEditLogManifestResponseProto getEditLogManifest(
+      String jid, long sinceTxId) throws IOException;
+  
+  /**
+   * Begin the recovery process for a given segment. See the HDFS-3077
+   * design document for details.
+   */
+  public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
+      long segmentTxId) throws IOException;
+
+  /**
+   * Accept a proposed recovery for the given transaction ID.
+   */
+  public void acceptRecovery(RequestInfo reqInfo,
+      SegmentStateProto stateToAccept, URL fromUrl) throws IOException;
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+@InterfaceAudience.Private
+public class RequestInfo {
+  private String jid;
+  private long epoch;
+  private long ipcSerialNumber;
+  private long committedTxId;
+  
+  public RequestInfo(String jid, long epoch, long ipcSerialNumber,
+      long committedTxId) {
+    this.jid = jid;
+    this.epoch = epoch;
+    this.ipcSerialNumber = ipcSerialNumber;
+    this.committedTxId = committedTxId;
+  }
+
+  public long getEpoch() {
+    return epoch;
+  }
+
+  public void setEpoch(long epoch) {
+    this.epoch = epoch;
+  }
+  
+  public String getJournalId() {
+    return jid;
+  }
+
+  public long getIpcSerialNumber() {
+    return ipcSerialNumber;
+  }
+
+  public void setIpcSerialNumber(long ipcSerialNumber) {
+    this.ipcSerialNumber = ipcSerialNumber;
+  }
+
+  public long getCommittedTxId() {
+    return committedTxId;
+  }
+
+  public boolean hasCommittedTxId() {
+    return (committedTxId != HdfsConstants.INVALID_TXID);
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.QJournalProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used to journal edits to a JournalNode participating
+ * in the quorum journal.
+ * Note: This extends the protocolbuffer service based interface to
+ * add annotations required for security.
+ */
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY,
+    clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
+@ProtocolInfo(protocolName = 
+    "org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol",
+    protocolVersion = 1)
+@InterfaceAudience.Private
+public interface QJournalProtocolPB extends
+    QJournalProtocolService.BlockingInterface {
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.IsFormattedRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.IsFormattedResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+import java.io.IOException;
+import java.net.URL;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link JournalProtocolPB} to the 
+ * {@link JournalProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolPB {
+  /** Server side implementation to delegate the requests to */
+  private final QJournalProtocol impl;
+
+  public QJournalProtocolServerSideTranslatorPB(QJournalProtocol impl) {
+    this.impl = impl;
+  }
+
+  
+  @Override
+  public IsFormattedResponseProto isFormatted(RpcController controller,
+      IsFormattedRequestProto request) throws ServiceException {
+    try {
+      boolean ret = impl.isFormatted(
+          convert(request.getJid()));
+      return IsFormattedResponseProto.newBuilder()
+          .setIsFormatted(ret)
+          .build();
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+
+
+  @Override
+  public GetJournalStateResponseProto getJournalState(RpcController controller,
+      GetJournalStateRequestProto request) throws ServiceException {
+    try {
+      return impl.getJournalState(
+          convert(request.getJid()));
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+
+  private String convert(JournalIdProto jid) {
+    return jid.getIdentifier();
+  }
+
+  @Override
+  public NewEpochResponseProto newEpoch(RpcController controller,
+      NewEpochRequestProto request) throws ServiceException {
+    try {
+      return impl.newEpoch(
+          request.getJid().getIdentifier(),
+          PBHelper.convert(request.getNsInfo()),
+          request.getEpoch());
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+
+  public FormatResponseProto format(RpcController controller,
+      FormatRequestProto request) throws ServiceException {
+    try {
+      impl.format(request.getJid().getIdentifier(),
+          PBHelper.convert(request.getNsInfo()));
+      return FormatResponseProto.getDefaultInstance();
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+  
+  /** @see JournalProtocol#journal */
+  @Override
+  public JournalResponseProto journal(RpcController unused,
+      JournalRequestProto req) throws ServiceException {
+    try {
+      impl.journal(convert(req.getReqInfo()),
+          req.getSegmentTxnId(), req.getFirstTxnId(),
+          req.getNumTxns(), req.getRecords().toByteArray());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return JournalResponseProto.newBuilder().build();
+  }
+
+  /** @see JournalProtocol#heartbeat */
+  @Override
+  public HeartbeatResponseProto heartbeat(RpcController controller,
+      HeartbeatRequestProto req) throws ServiceException {
+    try {
+      impl.heartbeat(convert(req.getReqInfo()));
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return HeartbeatResponseProto.getDefaultInstance();
+  }
+
+  /** @see JournalProtocol#startLogSegment */
+  @Override
+  public StartLogSegmentResponseProto startLogSegment(RpcController controller,
+      StartLogSegmentRequestProto req) throws ServiceException {
+    try {
+      impl.startLogSegment(convert(req.getReqInfo()),
+          req.getTxid());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return StartLogSegmentResponseProto.newBuilder().build();
+  }
+  
+  @Override
+  public FinalizeLogSegmentResponseProto finalizeLogSegment(
+      RpcController controller, FinalizeLogSegmentRequestProto req)
+      throws ServiceException {
+    try {
+      impl.finalizeLogSegment(convert(req.getReqInfo()),
+          req.getStartTxId(), req.getEndTxId());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return FinalizeLogSegmentResponseProto.newBuilder().build();
+  }
+  
+  @Override
+  public PurgeLogsResponseProto purgeLogs(RpcController controller,
+      PurgeLogsRequestProto req) throws ServiceException {
+    try {
+      impl.purgeLogsOlderThan(convert(req.getReqInfo()),
+          req.getMinTxIdToKeep());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return PurgeLogsResponseProto.getDefaultInstance();
+  }
+
+  @Override
+  public GetEditLogManifestResponseProto getEditLogManifest(
+      RpcController controller, GetEditLogManifestRequestProto request)
+      throws ServiceException {
+    try {
+      return impl.getEditLogManifest(
+          request.getJid().getIdentifier(),
+          request.getSinceTxId());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+
+  @Override
+  public PrepareRecoveryResponseProto prepareRecovery(RpcController controller,
+      PrepareRecoveryRequestProto request) throws ServiceException {
+    try {
+      return impl.prepareRecovery(convert(request.getReqInfo()),
+          request.getSegmentTxId());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public AcceptRecoveryResponseProto acceptRecovery(RpcController controller,
+      AcceptRecoveryRequestProto request) throws ServiceException {
+    try {
+      impl.acceptRecovery(convert(request.getReqInfo()),
+          request.getStateToAccept(),
+          new URL(request.getFromURL()));
+      return AcceptRecoveryResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  
+  private RequestInfo convert(
+      QJournalProtocolProtos.RequestInfoProto reqInfo) {
+    return new RequestInfo(
+        reqInfo.getJournalId().getIdentifier(),
+        reqInfo.getEpoch(),
+        reqInfo.getIpcSerialNumber(),
+        reqInfo.hasCommittedTxId() ?
+          reqInfo.getCommittedTxId() : HdfsConstants.INVALID_TXID);
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.IsFormattedRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.IsFormattedResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.RequestInfoProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link JournalProtocol} interfaces to the RPC server implementing
+ * {@link JournalProtocolPB}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
+    QJournalProtocol, Closeable {
+  /** RpcController is not used and hence is set to null */
+  private final static RpcController NULL_CONTROLLER = null;
+  private final QJournalProtocolPB rpcProxy;
+  
+  public QJournalProtocolTranslatorPB(QJournalProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
+  }
+
+  @Override
+  public void close() {
+    RPC.stopProxy(rpcProxy);
+  }
+
+
+  @Override
+  public boolean isFormatted(String journalId) throws IOException {
+    try {
+      IsFormattedRequestProto req = IsFormattedRequestProto.newBuilder()
+          .setJid(convertJournalId(journalId))
+          .build();
+      IsFormattedResponseProto resp = rpcProxy.isFormatted(
+          NULL_CONTROLLER, req);
+      return resp.getIsFormatted();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public GetJournalStateResponseProto getJournalState(String jid)
+      throws IOException {
+    try {
+      GetJournalStateRequestProto req = GetJournalStateRequestProto.newBuilder()
+          .setJid(convertJournalId(jid))
+          .build();
+      return rpcProxy.getJournalState(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  private JournalIdProto convertJournalId(String jid) {
+    return JournalIdProto.newBuilder()
+        .setIdentifier(jid)
+        .build();
+  }
+  
+  @Override
+  public void format(String jid, NamespaceInfo nsInfo) throws IOException {
+    try {
+      FormatRequestProto req = FormatRequestProto.newBuilder()
+          .setJid(convertJournalId(jid))
+          .setNsInfo(PBHelper.convert(nsInfo))
+          .build();
+      rpcProxy.format(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public NewEpochResponseProto newEpoch(String jid, NamespaceInfo nsInfo,
+      long epoch) throws IOException {
+    try {
+      NewEpochRequestProto req = NewEpochRequestProto.newBuilder()
+        .setJid(convertJournalId(jid))
+        .setNsInfo(PBHelper.convert(nsInfo))
+        .setEpoch(epoch)
+        .build();
+      return rpcProxy.newEpoch(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void journal(RequestInfo reqInfo,
+      long segmentTxId, long firstTxnId, int numTxns,
+      byte[] records) throws IOException {
+    JournalRequestProto req = JournalRequestProto.newBuilder()
+        .setReqInfo(convert(reqInfo))
+        .setSegmentTxnId(segmentTxId)
+        .setFirstTxnId(firstTxnId)
+        .setNumTxns(numTxns)
+        .setRecords(PBHelper.getByteString(records))
+        .build();
+    try {
+      rpcProxy.journal(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+  
+  @Override
+  public void heartbeat(RequestInfo reqInfo) throws IOException {
+    try {
+      rpcProxy.heartbeat(NULL_CONTROLLER, HeartbeatRequestProto.newBuilder()
+            .setReqInfo(convert(reqInfo))
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  private QJournalProtocolProtos.RequestInfoProto convert(
+      RequestInfo reqInfo) {
+    RequestInfoProto.Builder builder = RequestInfoProto.newBuilder()
+        .setJournalId(convertJournalId(reqInfo.getJournalId()))
+        .setEpoch(reqInfo.getEpoch())
+        .setIpcSerialNumber(reqInfo.getIpcSerialNumber());
+    if (reqInfo.hasCommittedTxId()) {
+      builder.setCommittedTxId(reqInfo.getCommittedTxId());
+    }
+    return builder.build();
+  }
+
+  @Override
+  public void startLogSegment(RequestInfo reqInfo, long txid)
+      throws IOException {
+    StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
+        .setReqInfo(convert(reqInfo))
+        .setTxid(txid)
+        .build();
+    try {
+      rpcProxy.startLogSegment(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+  
+  @Override
+  public void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
+      long endTxId) throws IOException {
+    FinalizeLogSegmentRequestProto req =
+        FinalizeLogSegmentRequestProto.newBuilder()
+        .setReqInfo(convert(reqInfo))
+        .setStartTxId(startTxId)
+        .setEndTxId(endTxId)
+        .build();
+    try {
+      rpcProxy.finalizeLogSegment(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+  
+  @Override
+  public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
+      throws IOException {
+    PurgeLogsRequestProto req = PurgeLogsRequestProto.newBuilder()
+        .setReqInfo(convert(reqInfo))
+        .setMinTxIdToKeep(minTxIdToKeep)
+        .build();
+    try {
+      rpcProxy.purgeLogs(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public GetEditLogManifestResponseProto getEditLogManifest(String jid,
+      long sinceTxId) throws IOException {
+    try {
+      return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
+          GetEditLogManifestRequestProto.newBuilder()
+            .setJid(convertJournalId(jid))
+            .setSinceTxId(sinceTxId)
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
+      long segmentTxId) throws IOException {
+    try {
+      return rpcProxy.prepareRecovery(NULL_CONTROLLER,
+          PrepareRecoveryRequestProto.newBuilder()
+            .setReqInfo(convert(reqInfo))
+            .setSegmentTxId(segmentTxId)
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void acceptRecovery(RequestInfo reqInfo,
+      SegmentStateProto stateToAccept, URL fromUrl) throws IOException {
+    try {
+      rpcProxy.acceptRecovery(NULL_CONTROLLER,
+          AcceptRecoveryRequestProto.newBuilder()
+            .setReqInfo(convert(reqInfo))
+            .setStateToAccept(stateToAccept)
+            .setFromURL(fromUrl.toExternalForm())
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  public boolean isMethodSupported(String methodName) throws IOException {
+    return RpcClientUtil.isMethodSupported(rpcProxy,
+        QJournalProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        RPC.getProtocolVersion(QJournalProtocolPB.class), methodName);
+  }
+
+}