You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/12/05 20:22:25 UTC
svn commit: r1417596 [2/6] - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
dev-support/
src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/
src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java...
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.Message;
+import com.google.protobuf.TextFormat;
+
+/**
+ * Represents a set of calls for which a quorum of results is needed.
+ * @param <KEY> a key used to identify each of the outgoing calls
+ * @param <RESULT> the type of the call result
+ */
+class QuorumCall<KEY, RESULT> {
+ private final Map<KEY, RESULT> successes = Maps.newHashMap();
+ private final Map<KEY, Throwable> exceptions = Maps.newHashMap();
+
+ /**
+ * Interval, in milliseconds, at which a log message will be made
+ * while waiting for a quorum call.
+ */
+ private static final int WAIT_PROGRESS_INTERVAL_MILLIS = 1000;
+
+ /**
+ * Start logging messages at INFO level periodically after waiting for
+ * this fraction of the configured timeout for any call.
+ */
+ private static final float WAIT_PROGRESS_INFO_THRESHOLD = 0.3f;
+ /**
+ * Start logging messages at WARN level after waiting for this
+ * fraction of the configured timeout for any call.
+ */
+ private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
+
+ static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
+ Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
+ final QuorumCall<KEY, RESULT> qr = new QuorumCall<KEY, RESULT>();
+ for (final Entry<KEY, ? extends ListenableFuture<RESULT>> e : calls.entrySet()) {
+ Preconditions.checkArgument(e.getValue() != null,
+ "null future for key: " + e.getKey());
+ Futures.addCallback(e.getValue(), new FutureCallback<RESULT>() {
+ @Override
+ public void onFailure(Throwable t) {
+ qr.addException(e.getKey(), t);
+ }
+
+ @Override
+ public void onSuccess(RESULT res) {
+ qr.addResult(e.getKey(), res);
+ }
+ });
+ }
+ return qr;
+ }
+
+ private QuorumCall() {
+ // Only instantiated from factory method above
+ }
+
+ /**
+ * Wait for the quorum to achieve a certain number of responses.
+ *
+ * Note that, even after this returns, more responses may arrive,
+ * causing the return value of other methods in this class to change.
+ *
+ * @param minResponses return as soon as this many responses have been
+ * received, regardless of whether they are successes or exceptions
+ * @param minSuccesses return as soon as this many successful (non-exception)
+ * responses have been received
+ * @param maxExceptions return as soon as this many exception responses
+ * have been received. Pass 0 to return immediately if any exception is
+ * received.
+ * @param millis the number of milliseconds to wait for
+ * @throws InterruptedException if the thread is interrupted while waiting
+ * @throws TimeoutException if the specified timeout elapses before
+ * achieving the desired conditions
+ */
+ public synchronized void waitFor(
+ int minResponses, int minSuccesses, int maxExceptions,
+ int millis, String operationName)
+ throws InterruptedException, TimeoutException {
+ long st = Time.monotonicNow();
+ long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
+ long et = st + millis;
+ while (true) {
+ checkAssertionErrors();
+ if (minResponses > 0 && countResponses() >= minResponses) return;
+ if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
+ if (maxExceptions >= 0 && countExceptions() > maxExceptions) return;
+ long now = Time.monotonicNow();
+
+ if (now > nextLogTime) {
+ long waited = now - st;
+ String msg = String.format(
+ "Waited %s ms (timeout=%s ms) for a response for %s",
+ waited, millis, operationName);
+ if (waited > millis * WAIT_PROGRESS_WARN_THRESHOLD) {
+ QuorumJournalManager.LOG.warn(msg);
+ } else {
+ QuorumJournalManager.LOG.info(msg);
+ }
+ nextLogTime = now + WAIT_PROGRESS_INTERVAL_MILLIS;
+ }
+ long rem = et - now;
+ if (rem <= 0) {
+ throw new TimeoutException();
+ }
+ rem = Math.min(rem, nextLogTime - now);
+ rem = Math.max(rem, 1);
+ wait(rem);
+ }
+ }
+
+ /**
+ * Check if any of the responses came back with an AssertionError.
+ * If so, it re-throws it, even if there was a quorum of responses.
+ * This code only runs if assertions are enabled for this class,
+ * otherwise it should JIT itself away.
+ *
+ * This is done since AssertionError indicates programmer confusion
+ * rather than some kind of expected issue, and thus in the context
+ * of test cases we'd like to actually fail the test case instead of
+ * continuing through.
+ */
+ private synchronized void checkAssertionErrors() {
+ boolean assertsEnabled = false;
+ assert assertsEnabled = true; // sets to true if enabled
+ if (assertsEnabled) {
+ for (Throwable t : exceptions.values()) {
+ if (t instanceof AssertionError) {
+ throw (AssertionError)t;
+ } else if (t instanceof RemoteException &&
+ ((RemoteException)t).getClassName().equals(
+ AssertionError.class.getName())) {
+ throw new AssertionError(t);
+ }
+ }
+ }
+ }
+
+ private synchronized void addResult(KEY k, RESULT res) {
+ successes.put(k, res);
+ notifyAll();
+ }
+
+ private synchronized void addException(KEY k, Throwable t) {
+ exceptions.put(k, t);
+ notifyAll();
+ }
+
+ /**
+ * @return the total number of calls for which a response has been received,
+ * regardless of whether it threw an exception or returned a successful
+ * result.
+ */
+ public synchronized int countResponses() {
+ return successes.size() + exceptions.size();
+ }
+
+ /**
+ * @return the number of calls for which a non-exception response has been
+ * received.
+ */
+ public synchronized int countSuccesses() {
+ return successes.size();
+ }
+
+ /**
+ * @return the number of calls for which an exception response has been
+ * received.
+ */
+ public synchronized int countExceptions() {
+ return exceptions.size();
+ }
+
+ /**
+ * @return the map of successful responses. A copy is made such that this
+ * map will not be further mutated, even if further results arrive for the
+ * quorum.
+ */
+ public synchronized Map<KEY, RESULT> getResults() {
+ return Maps.newHashMap(successes);
+ }
+
+ public synchronized void rethrowException(String msg) throws QuorumException {
+ Preconditions.checkState(!exceptions.isEmpty());
+ throw QuorumException.create(msg, successes, exceptions);
+ }
+
+ public static <K> String mapToString(
+ Map<K, ? extends Message> map) {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (Map.Entry<K, ? extends Message> e : map.entrySet()) {
+ if (!first) {
+ sb.append("\n");
+ }
+ first = false;
+ sb.append(e.getKey()).append(": ")
+ .append(TextFormat.shortDebugString(e.getValue()));
+ }
+ return sb.toString();
+ }
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+/**
+ * Exception thrown when too many exceptions occur while gathering
+ * responses to a quorum call.
+ */
+class QuorumException extends IOException {
+
+ /**
+ * Create a QuorumException instance with a descriptive message detailing
+ * the underlying exceptions, as well as any successful responses which
+ * were returned.
+ * @param <K> the keys for the quorum calls
+ * @param <V> the success response type
+ * @param successes any successful responses returned
+ * @param exceptions the exceptions returned
+ */
+ public static <K, V> QuorumException create(
+ String simpleMsg,
+ Map<K, V> successes,
+ Map<K, Throwable> exceptions) {
+ Preconditions.checkArgument(!exceptions.isEmpty(),
+ "Must pass exceptions");
+
+ StringBuilder msg = new StringBuilder();
+ msg.append(simpleMsg).append(". ");
+ if (!successes.isEmpty()) {
+ msg.append(successes.size()).append(" successful responses:\n");
+
+ Joiner.on("\n")
+ .useForNull("null [success]")
+ .withKeyValueSeparator(": ")
+ .appendTo(msg, successes);
+ msg.append("\n");
+ }
+
+ msg.append(exceptions.size() + " exceptions thrown:\n");
+ boolean isFirst = true;
+
+ for (Map.Entry<K, Throwable> e : exceptions.entrySet()) {
+ if (!isFirst) {
+ msg.append("\n");
+ }
+ isFirst = false;
+
+ msg.append(e.getKey()).append(": ");
+
+ if (e.getValue() instanceof RuntimeException) {
+ msg.append(StringUtils.stringifyException(e.getValue()));
+ } else if (e.getValue().getLocalizedMessage() != null) {
+ msg.append(e.getValue().getLocalizedMessage());
+ } else {
+ msg.append(StringUtils.stringifyException(e.getValue()));
+ }
+ }
+ return new QuorumException(msg.toString());
+ }
+
+ private QuorumException(String msg) {
+ super(msg);
+ }
+
+ private static final long serialVersionUID = 1L;
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.TextFormat;
+
+/**
+ * A JournalManager that writes to a set of remote JournalNodes,
+ * requiring a quorum of nodes to ack each write.
+ */
+@InterfaceAudience.Private
+public class QuorumJournalManager implements JournalManager {
+ static final Log LOG = LogFactory.getLog(QuorumJournalManager.class);
+
+ // Timeouts for which the QJM will wait for each of the following actions.
+ private final int startSegmentTimeoutMs;
+ private final int prepareRecoveryTimeoutMs;
+ private final int acceptRecoveryTimeoutMs;
+ private final int finalizeSegmentTimeoutMs;
+ private final int selectInputStreamsTimeoutMs;
+ private final int getJournalStateTimeoutMs;
+ private final int newEpochTimeoutMs;
+ private final int writeTxnsTimeoutMs;
+
+ // Since these don't occur during normal operation, we can
+ // use rather lengthy timeouts, and don't need to make them
+ // configurable.
+ private static final int FORMAT_TIMEOUT_MS = 60000;
+ private static final int HASDATA_TIMEOUT_MS = 60000;
+
+ private final Configuration conf;
+ private final URI uri;
+ private final NamespaceInfo nsInfo;
+ private boolean isActiveWriter;
+
+ private final AsyncLoggerSet loggers;
+
+ private int outputBufferCapacity = 512 * 1024;
+
+ public QuorumJournalManager(Configuration conf,
+ URI uri, NamespaceInfo nsInfo) throws IOException {
+ this(conf, uri, nsInfo, IPCLoggerChannel.FACTORY);
+ }
+
+ QuorumJournalManager(Configuration conf,
+ URI uri, NamespaceInfo nsInfo,
+ AsyncLogger.Factory loggerFactory) throws IOException {
+ Preconditions.checkArgument(conf != null, "must be configured");
+
+ this.conf = conf;
+ this.uri = uri;
+ this.nsInfo = nsInfo;
+ this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
+
+ // Configure timeouts.
+ this.startSegmentTimeoutMs = conf.getInt(
+ DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT);
+ this.prepareRecoveryTimeoutMs = conf.getInt(
+ DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT);
+ this.acceptRecoveryTimeoutMs = conf.getInt(
+ DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT);
+ this.finalizeSegmentTimeoutMs = conf.getInt(
+ DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT);
+ this.selectInputStreamsTimeoutMs = conf.getInt(
+ DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT);
+ this.getJournalStateTimeoutMs = conf.getInt(
+ DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT);
+ this.newEpochTimeoutMs = conf.getInt(
+ DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT);
+ this.writeTxnsTimeoutMs = conf.getInt(
+ DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT);
+ }
+
+ protected List<AsyncLogger> createLoggers(
+ AsyncLogger.Factory factory) throws IOException {
+ return createLoggers(conf, uri, nsInfo, factory);
+ }
+
+ static String parseJournalId(URI uri) {
+ String path = uri.getPath();
+ Preconditions.checkArgument(path != null && !path.isEmpty(),
+ "Bad URI '%s': must identify journal in path component",
+ uri);
+ String journalId = path.substring(1);
+ checkJournalId(journalId);
+ return journalId;
+ }
+
+ public static void checkJournalId(String jid) {
+ Preconditions.checkArgument(jid != null &&
+ !jid.isEmpty() &&
+ !jid.contains("/") &&
+ !jid.startsWith("."),
+ "bad journal id: " + jid);
+ }
+
+
+ /**
+ * Fence any previous writers, and obtain a unique epoch number
+ * for write-access to the journal nodes.
+ *
+ * @return the new, unique epoch number
+ */
+ Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
+ throws IOException {
+ Preconditions.checkState(!loggers.isEpochEstablished(),
+ "epoch already created");
+
+ Map<AsyncLogger, GetJournalStateResponseProto> lastPromises =
+ loggers.waitForWriteQuorum(loggers.getJournalState(),
+ getJournalStateTimeoutMs, "getJournalState()");
+
+ long maxPromised = Long.MIN_VALUE;
+ for (GetJournalStateResponseProto resp : lastPromises.values()) {
+ maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch());
+ }
+ assert maxPromised >= 0;
+
+ long myEpoch = maxPromised + 1;
+ Map<AsyncLogger, NewEpochResponseProto> resps =
+ loggers.waitForWriteQuorum(loggers.newEpoch(nsInfo, myEpoch),
+ newEpochTimeoutMs, "newEpoch(" + myEpoch + ")");
+
+ loggers.setEpoch(myEpoch);
+ return resps;
+ }
+
+ @Override
+ public void format(NamespaceInfo nsInfo) throws IOException {
+ QuorumCall<AsyncLogger,Void> call = loggers.format(nsInfo);
+ try {
+ call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS,
+ "format");
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for format() response");
+ } catch (TimeoutException e) {
+ throw new IOException("Timed out waiting for format() response");
+ }
+
+ if (call.countExceptions() > 0) {
+ call.rethrowException("Could not format one or more JournalNodes");
+ }
+ }
+
+ @Override
+ public boolean hasSomeData() throws IOException {
+ QuorumCall<AsyncLogger, Boolean> call =
+ loggers.isFormatted();
+
+ try {
+ call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS, "hasSomeData");
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while determining if JNs have data");
+ } catch (TimeoutException e) {
+ throw new IOException("Timed out waiting for response from loggers");
+ }
+
+ if (call.countExceptions() > 0) {
+ call.rethrowException(
+ "Unable to check if JNs are ready for formatting");
+ }
+
+ // If any of the loggers returned with a non-empty manifest, then
+ // we should prompt for format.
+ for (Boolean hasData : call.getResults().values()) {
+ if (hasData) {
+ return true;
+ }
+ }
+
+ // Otherwise, none were formatted, we can safely format.
+ return false;
+ }
+
+ /**
+ * Run recovery/synchronization for a specific segment.
+ * Postconditions:
+ * <ul>
+ * <li>This segment will be finalized on a majority
+ * of nodes.</li>
+ * <li>All nodes which contain the finalized segment will
+ * agree on the length.</li>
+ * </ul>
+ *
+ * @param segmentTxId the starting txid of the segment
+ * @throws IOException
+ */
+ private void recoverUnclosedSegment(long segmentTxId) throws IOException {
+ Preconditions.checkArgument(segmentTxId > 0);
+ LOG.info("Beginning recovery of unclosed segment starting at txid " +
+ segmentTxId);
+
+ // Step 1. Prepare recovery
+ QuorumCall<AsyncLogger,PrepareRecoveryResponseProto> prepare =
+ loggers.prepareRecovery(segmentTxId);
+ Map<AsyncLogger, PrepareRecoveryResponseProto> prepareResponses=
+ loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs,
+ "prepareRecovery(" + segmentTxId + ")");
+ LOG.info("Recovery prepare phase complete. Responses:\n" +
+ QuorumCall.mapToString(prepareResponses));
+
+ // Determine the logger who either:
+ // a) Has already accepted a previous proposal that's higher than any
+ // other
+ //
+ // OR, if no such logger exists:
+ //
+ // b) Has the longest log starting at this transaction ID
+
+ // TODO: we should collect any "ties" and pass the URL for all of them
+ // when syncing, so we can tolerate failure during recovery better.
+ Entry<AsyncLogger, PrepareRecoveryResponseProto> bestEntry = Collections.max(
+ prepareResponses.entrySet(), SegmentRecoveryComparator.INSTANCE);
+ AsyncLogger bestLogger = bestEntry.getKey();
+ PrepareRecoveryResponseProto bestResponse = bestEntry.getValue();
+
+ // Log the above decision, check invariants.
+ if (bestResponse.hasAcceptedInEpoch()) {
+ LOG.info("Using already-accepted recovery for segment " +
+ "starting at txid " + segmentTxId + ": " +
+ bestEntry);
+ } else if (bestResponse.hasSegmentState()) {
+ LOG.info("Using longest log: " + bestEntry);
+ } else {
+ // None of the responses to prepareRecovery() had a segment at the given
+ // txid. This can happen for example in the following situation:
+ // - 3 JNs: JN1, JN2, JN3
+ // - writer starts segment 101 on JN1, then crashes before
+ // writing to JN2 and JN3
+ // - during newEpoch(), we saw the segment on JN1 and decide to
+ // recover segment 101
+ // - before prepare(), JN1 crashes, and we only talk to JN2 and JN3,
+ // neither of which has any entry for this log.
+ // In this case, it is allowed to do nothing for recovery, since the
+ // segment wasn't started on a quorum of nodes.
+
+ // Sanity check: we should only get here if none of the responses had
+ // a log. This should be a postcondition of the recovery comparator,
+ // but a bug in the comparator might cause us to get here.
+ for (PrepareRecoveryResponseProto resp : prepareResponses.values()) {
+ assert !resp.hasSegmentState() :
+ "One of the loggers had a response, but no best logger " +
+ "was found.";
+ }
+
+ LOG.info("None of the responders had a log to recover: " +
+ QuorumCall.mapToString(prepareResponses));
+ return;
+ }
+
+ SegmentStateProto logToSync = bestResponse.getSegmentState();
+ assert segmentTxId == logToSync.getStartTxId();
+
+ // Sanity check: none of the loggers should be aware of a higher
+ // txid than the txid we intend to truncate to
+ for (Map.Entry<AsyncLogger, PrepareRecoveryResponseProto> e :
+ prepareResponses.entrySet()) {
+ AsyncLogger logger = e.getKey();
+ PrepareRecoveryResponseProto resp = e.getValue();
+
+ if (resp.hasLastCommittedTxId() &&
+ resp.getLastCommittedTxId() > logToSync.getEndTxId()) {
+ throw new AssertionError("Decided to synchronize log to " + logToSync +
+ " but logger " + logger + " had seen txid " +
+ resp.getLastCommittedTxId() + " committed");
+ }
+ }
+
+ URL syncFromUrl = bestLogger.buildURLToFetchLogs(segmentTxId);
+
+ QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl);
+ loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs,
+ "acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")");
+
+ // If one of the loggers above missed the synchronization step above, but
+ // we send a finalize() here, that's OK. It validates the log before
+ // finalizing. Hence, even if it is not "in sync", it won't incorrectly
+ // finalize.
+ QuorumCall<AsyncLogger, Void> finalize =
+ loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId());
+ loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs,
+ String.format("finalizeLogSegment(%s-%s)",
+ logToSync.getStartTxId(),
+ logToSync.getEndTxId()));
+ }
+
+ static List<AsyncLogger> createLoggers(Configuration conf,
+ URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
+ throws IOException {
+ List<AsyncLogger> ret = Lists.newArrayList();
+ List<InetSocketAddress> addrs = getLoggerAddresses(uri);
+ String jid = parseJournalId(uri);
+ for (InetSocketAddress addr : addrs) {
+ ret.add(factory.createLogger(conf, nsInfo, jid, addr));
+ }
+ return ret;
+ }
+
+ private static List<InetSocketAddress> getLoggerAddresses(URI uri)
+ throws IOException {
+ String authority = uri.getAuthority();
+ Preconditions.checkArgument(authority != null && !authority.isEmpty(),
+ "URI has no authority: " + uri);
+
+ String[] parts = StringUtils.split(authority, ';');
+ for (int i = 0; i < parts.length; i++) {
+ parts[i] = parts[i].trim();
+ }
+
+ if (parts.length % 2 == 0) {
+ LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
+ "of Journal Nodes specified. This is not recommended!");
+ }
+
+ List<InetSocketAddress> addrs = Lists.newArrayList();
+ for (String addr : parts) {
+ addrs.add(NetUtils.createSocketAddr(
+ addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT));
+ }
+ return addrs;
+ }
+
+ @Override
+ public EditLogOutputStream startLogSegment(long txId) throws IOException {
+ Preconditions.checkState(isActiveWriter,
+ "must recover segments before starting a new one");
+ QuorumCall<AsyncLogger,Void> q = loggers.startLogSegment(txId);
+ loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
+ "startLogSegment(" + txId + ")");
+ return new QuorumOutputStream(loggers, txId,
+ outputBufferCapacity, writeTxnsTimeoutMs);
+ }
+
+ @Override
+ public void finalizeLogSegment(long firstTxId, long lastTxId)
+ throws IOException {
+ QuorumCall<AsyncLogger,Void> q = loggers.finalizeLogSegment(
+ firstTxId, lastTxId);
+ loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs,
+ String.format("finalizeLogSegment(%s-%s)", firstTxId, lastTxId));
+ }
+
+ @Override
+ public void setOutputBufferCapacity(int size) {
+ outputBufferCapacity = size;
+ }
+
+ @Override
+ public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
+ // This purges asynchronously -- there's no need to wait for a quorum
+ // here, because it's always OK to fail.
+ LOG.info("Purging remote journals older than txid " + minTxIdToKeep);
+ loggers.purgeLogsOlderThan(minTxIdToKeep);
+ }
+
+ @Override
+ public void recoverUnfinalizedSegments() throws IOException {
+ Preconditions.checkState(!isActiveWriter, "already active writer");
+
+ LOG.info("Starting recovery process for unclosed journal segments...");
+ Map<AsyncLogger, NewEpochResponseProto> resps = createNewUniqueEpoch();
+ LOG.info("Successfully started new epoch " + loggers.getEpoch());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("newEpoch(" + loggers.getEpoch() + ") responses:\n" +
+ QuorumCall.mapToString(resps));
+ }
+
+ long mostRecentSegmentTxId = Long.MIN_VALUE;
+ for (NewEpochResponseProto r : resps.values()) {
+ if (r.hasLastSegmentTxId()) {
+ mostRecentSegmentTxId = Math.max(mostRecentSegmentTxId,
+ r.getLastSegmentTxId());
+ }
+ }
+
+ // On a completely fresh system, none of the journals have any
+ // segments, so there's nothing to recover.
+ if (mostRecentSegmentTxId != Long.MIN_VALUE) {
+ recoverUnclosedSegment(mostRecentSegmentTxId);
+ }
+ isActiveWriter = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ loggers.close();
+ }
+
+ @Override
+ public void selectInputStreams(Collection<EditLogInputStream> streams,
+ long fromTxnId, boolean inProgressOk) throws IOException {
+
+ QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
+ loggers.getEditLogManifest(fromTxnId);
+ Map<AsyncLogger, RemoteEditLogManifest> resps =
+ loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
+ "selectInputStreams");
+
+ LOG.debug("selectInputStream manifests:\n" +
+ Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
+
+ final PriorityQueue<EditLogInputStream> allStreams =
+ new PriorityQueue<EditLogInputStream>(64,
+ JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+ for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
+ AsyncLogger logger = e.getKey();
+ RemoteEditLogManifest manifest = e.getValue();
+
+ for (RemoteEditLog remoteLog : manifest.getLogs()) {
+ URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
+
+ EditLogInputStream elis = EditLogFileInputStream.fromUrl(
+ url, remoteLog.getStartTxId(), remoteLog.getEndTxId(),
+ remoteLog.isInProgress());
+ allStreams.add(elis);
+ }
+ }
+ JournalSet.chainAndMakeRedundantStreams(
+ streams, allStreams, fromTxnId, inProgressOk);
+ }
+
+ @Override
+ public String toString() {
+ return "QJM to " + loggers;
+ }
+
+ @VisibleForTesting
+ AsyncLoggerSet getLoggerSetForTests() {
+ return loggers;
+ }
+
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditsDoubleBuffer;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+/**
+ * EditLogOutputStream implementation that writes to a quorum of
+ * remote journals.
+ */
+class QuorumOutputStream extends EditLogOutputStream {
+ private final AsyncLoggerSet loggers;
+ private EditsDoubleBuffer buf;
+ private final long segmentTxId;
+ private final int writeTimeoutMs;
+
+ public QuorumOutputStream(AsyncLoggerSet loggers,
+ long txId, int outputBufferCapacity,
+ int writeTimeoutMs) throws IOException {
+ super();
+ this.buf = new EditsDoubleBuffer(outputBufferCapacity);
+ this.loggers = loggers;
+ this.segmentTxId = txId;
+ this.writeTimeoutMs = writeTimeoutMs;
+ }
+
+ @Override
+ public void write(FSEditLogOp op) throws IOException {
+ buf.writeOp(op);
+ }
+
+ @Override
+ public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+ buf.writeRaw(bytes, offset, length);
+ }
+
+ @Override
+ public void create() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (buf != null) {
+ buf.close();
+ buf = null;
+ }
+ }
+
+ @Override
+ public void abort() throws IOException {
+ QuorumJournalManager.LOG.warn("Aborting " + this);
+ buf = null;
+ close();
+ }
+
+ @Override
+ public void setReadyToFlush() throws IOException {
+ buf.setReadyToFlush();
+ }
+
+ @Override
+ protected void flushAndSync(boolean durable) throws IOException {
+ int numReadyBytes = buf.countReadyBytes();
+ if (numReadyBytes > 0) {
+ int numReadyTxns = buf.countReadyTxns();
+ long firstTxToFlush = buf.getFirstReadyTxId();
+
+ assert numReadyTxns > 0;
+
+ // Copy from our double-buffer into a new byte array. This is for
+ // two reasons:
+ // 1) The IPC code has no way of specifying to send only a slice of
+ // a larger array.
+ // 2) because the calls to the underlying nodes are asynchronous, we
+ // need a defensive copy to avoid accidentally mutating the buffer
+ // before it is sent.
+ DataOutputBuffer bufToSend = new DataOutputBuffer(numReadyBytes);
+ buf.flushTo(bufToSend);
+ assert bufToSend.getLength() == numReadyBytes;
+ byte[] data = bufToSend.getData();
+ assert data.length == bufToSend.getLength();
+
+ QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
+ segmentTxId, firstTxToFlush,
+ numReadyTxns, data);
+ loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits");
+
+ // Since we successfully wrote this batch, let the loggers know. Any future
+ // RPCs will thus let the loggers know of the most recent transaction, even
+ // if a logger has fallen behind.
+ loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1);
+ }
+ }
+
+ @Override
+ public String generateHtmlReport() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Writing segment beginning at txid " + segmentTxId + "<br/>\n");
+ loggers.appendHtmlReport(sb);
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ return "QuorumOutputStream starting at txid " + segmentTxId;
+ }
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.util.Comparator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.primitives.Booleans;
+
+/**
+ * Compares responses to the prepareRecovery RPC. This is responsible for
+ * determining the correct length to recover.
+ */
+class SegmentRecoveryComparator
+ implements Comparator<Entry<AsyncLogger, PrepareRecoveryResponseProto>> {
+
+ static final SegmentRecoveryComparator INSTANCE = new SegmentRecoveryComparator();
+
+ @Override
+ public int compare(
+ Entry<AsyncLogger, PrepareRecoveryResponseProto> a,
+ Entry<AsyncLogger, PrepareRecoveryResponseProto> b) {
+
+ PrepareRecoveryResponseProto r1 = a.getValue();
+ PrepareRecoveryResponseProto r2 = b.getValue();
+
+ // A response that has data for a segment is always better than one
+ // that doesn't.
+ if (r1.hasSegmentState() != r2.hasSegmentState()) {
+ return Booleans.compare(r1.hasSegmentState(), r2.hasSegmentState());
+ }
+
+ if (!r1.hasSegmentState()) {
+ // Neither has a segment, so neither can be used for recover.
+ // Call them equal.
+ return 0;
+ }
+
+ // They both have a segment.
+ SegmentStateProto r1Seg = r1.getSegmentState();
+ SegmentStateProto r2Seg = r2.getSegmentState();
+
+ Preconditions.checkArgument(r1Seg.getStartTxId() == r2Seg.getStartTxId(),
+ "Should only be called with responses for corresponding segments: " +
+ "%s and %s do not have the same start txid.", r1, r2);
+
+ // If one is in-progress but the other is finalized,
+ // the finalized one is greater.
+ if (r1Seg.getIsInProgress() != r2Seg.getIsInProgress()) {
+ return Booleans.compare(!r1Seg.getIsInProgress(), !r2Seg.getIsInProgress());
+ }
+
+ if (!r1Seg.getIsInProgress()) {
+ // If both are finalized, they should match lengths
+ if (r1Seg.getEndTxId() != r2Seg.getEndTxId()) {
+ throw new AssertionError("finalized segs with different lengths: " +
+ r1 + ", " + r2);
+ }
+ return 0;
+ }
+
+ // Both are in-progress.
+ long r1SeenEpoch = Math.max(r1.getAcceptedInEpoch(), r1.getLastWriterEpoch());
+ long r2SeenEpoch = Math.max(r2.getAcceptedInEpoch(), r2.getLastWriterEpoch());
+
+ return ComparisonChain.start()
+ .compare(r1SeenEpoch, r2SeenEpoch)
+ .compare(r1.getSegmentState().getEndTxId(), r2.getSegmentState().getEndTxId())
+ .result();
+ }
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import java.io.IOException;
+
+/**
+ * Exception indicating that a call has been made to a JournalNode
+ * which is not yet formatted.
+ */
+@InterfaceAudience.Private
+public class JournalNotFormattedException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public JournalNotFormattedException(String msg) {
+ super(msg);
+ }
+
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalOutOfSyncException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalOutOfSyncException.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalOutOfSyncException.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalOutOfSyncException.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class JournalOutOfSyncException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public JournalOutOfSyncException(String msg) {
+ super(msg);
+ }
+
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used to communicate between {@link QuorumJournalManager}
+ * and each {@link JournalNode}.
+ *
+ * This is responsible for sending edits as well as coordinating
+ * recovery of the nodes.
+ */
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY,
+ clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
+@InterfaceAudience.Private
+public interface QJournalProtocol {
+ public static final long versionID = 1L;
+
+ /**
+ * @return true if the given journal has been formatted and
+ * contains valid data.
+ */
+ public boolean isFormatted(String journalId) throws IOException;
+
+ /**
+ * Get the current state of the journal, including the most recent
+ * epoch number and the HTTP port.
+ */
+ public GetJournalStateResponseProto getJournalState(String journalId)
+ throws IOException;
+
+ /**
+ * Format the underlying storage for the given namespace.
+ */
+ public void format(String journalId,
+ NamespaceInfo nsInfo) throws IOException;
+
+ /**
+ * Begin a new epoch. See the HDFS-3077 design doc for details.
+ */
+ public NewEpochResponseProto newEpoch(String journalId,
+ NamespaceInfo nsInfo, long epoch) throws IOException;
+
+ /**
+ * Journal edit records.
+ * This message is sent by the active name-node to the JournalNodes
+ * to write edits to their local logs.
+ */
+ public void journal(RequestInfo reqInfo,
+ long segmentTxId,
+ long firstTxnId,
+ int numTxns,
+ byte[] records) throws IOException;
+
+
+ /**
+ * Heartbeat.
+ * This is a no-op on the server, except that it verifies that the
+ * caller is in fact still the active writer, and provides up-to-date
+ * information on the most recently committed txid.
+ */
+ public void heartbeat(RequestInfo reqInfo) throws IOException;
+
+ /**
+ * Start writing to a new log segment on the JournalNode.
+ * Before calling this, one should finalize the previous segment
+ * using {@link #finalizeLogSegment(RequestInfo, long, long)}.
+ *
+ * @param txid the first txid in the new log
+ */
+ public void startLogSegment(RequestInfo reqInfo,
+ long txid) throws IOException;
+
+ /**
+ * Finalize the given log segment on the JournalNode. The segment
+ * is expected to be in-progress and starting at the given startTxId.
+ *
+ * @param startTxId the starting transaction ID of the log
+ * @param endTxId the expected last transaction in the given log
+ * @throws IOException if no such segment exists
+ */
+ public void finalizeLogSegment(RequestInfo reqInfo,
+ long startTxId, long endTxId) throws IOException;
+
+ /**
+ * @throws IOException
+ * @see JournalManager#purgeLogsOlderThan(long)
+ */
+ public void purgeLogsOlderThan(RequestInfo requestInfo, long minTxIdToKeep)
+ throws IOException;
+
+ /**
+ * @param jid the journal from which to enumerate edits
+ * @param sinceTxId the first transaction which the client cares about
+ * @return a list of edit log segments since the given transaction ID.
+ */
+ public GetEditLogManifestResponseProto getEditLogManifest(
+ String jid, long sinceTxId) throws IOException;
+
+ /**
+ * Begin the recovery process for a given segment. See the HDFS-3077
+ * design document for details.
+ */
+ public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
+ long segmentTxId) throws IOException;
+
+ /**
+ * Accept a proposed recovery for the given transaction ID.
+ */
+ public void acceptRecovery(RequestInfo reqInfo,
+ SegmentStateProto stateToAccept, URL fromUrl) throws IOException;
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+@InterfaceAudience.Private
+public class RequestInfo {
+ private String jid;
+ private long epoch;
+ private long ipcSerialNumber;
+ private long committedTxId;
+
+ public RequestInfo(String jid, long epoch, long ipcSerialNumber,
+ long committedTxId) {
+ this.jid = jid;
+ this.epoch = epoch;
+ this.ipcSerialNumber = ipcSerialNumber;
+ this.committedTxId = committedTxId;
+ }
+
+ public long getEpoch() {
+ return epoch;
+ }
+
+ public void setEpoch(long epoch) {
+ this.epoch = epoch;
+ }
+
+ public String getJournalId() {
+ return jid;
+ }
+
+ public long getIpcSerialNumber() {
+ return ipcSerialNumber;
+ }
+
+ public void setIpcSerialNumber(long ipcSerialNumber) {
+ this.ipcSerialNumber = ipcSerialNumber;
+ }
+
+ public long getCommittedTxId() {
+ return committedTxId;
+ }
+
+ public boolean hasCommittedTxId() {
+ return (committedTxId != HdfsConstants.INVALID_TXID);
+ }
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.QJournalProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used to journal edits to a JournalNode participating
+ * in the quorum journal.
+ * Note: This extends the protocolbuffer service based interface to
+ * add annotations required for security.
+ */
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY,
+ clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
+@ProtocolInfo(protocolName =
+ "org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol",
+ protocolVersion = 1)
+@InterfaceAudience.Private
+public interface QJournalProtocolPB extends
+ QJournalProtocolService.BlockingInterface {
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.IsFormattedRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.IsFormattedResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+import java.io.IOException;
+import java.net.URL;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link JournalProtocolPB} to the
+ * {@link JournalProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolPB {
+ /** Server side implementation to delegate the requests to */
+ private final QJournalProtocol impl;
+
+ public QJournalProtocolServerSideTranslatorPB(QJournalProtocol impl) {
+ this.impl = impl;
+ }
+
+
+ @Override
+ public IsFormattedResponseProto isFormatted(RpcController controller,
+ IsFormattedRequestProto request) throws ServiceException {
+ try {
+ boolean ret = impl.isFormatted(
+ convert(request.getJid()));
+ return IsFormattedResponseProto.newBuilder()
+ .setIsFormatted(ret)
+ .build();
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
+
+ @Override
+ public GetJournalStateResponseProto getJournalState(RpcController controller,
+ GetJournalStateRequestProto request) throws ServiceException {
+ try {
+ return impl.getJournalState(
+ convert(request.getJid()));
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
+ private String convert(JournalIdProto jid) {
+ return jid.getIdentifier();
+ }
+
+ @Override
+ public NewEpochResponseProto newEpoch(RpcController controller,
+ NewEpochRequestProto request) throws ServiceException {
+ try {
+ return impl.newEpoch(
+ request.getJid().getIdentifier(),
+ PBHelper.convert(request.getNsInfo()),
+ request.getEpoch());
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
+ public FormatResponseProto format(RpcController controller,
+ FormatRequestProto request) throws ServiceException {
+ try {
+ impl.format(request.getJid().getIdentifier(),
+ PBHelper.convert(request.getNsInfo()));
+ return FormatResponseProto.getDefaultInstance();
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
+ /** @see JournalProtocol#journal */
+ @Override
+ public JournalResponseProto journal(RpcController unused,
+ JournalRequestProto req) throws ServiceException {
+ try {
+ impl.journal(convert(req.getReqInfo()),
+ req.getSegmentTxnId(), req.getFirstTxnId(),
+ req.getNumTxns(), req.getRecords().toByteArray());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return JournalResponseProto.newBuilder().build();
+ }
+
+ /** @see JournalProtocol#heartbeat */
+ @Override
+ public HeartbeatResponseProto heartbeat(RpcController controller,
+ HeartbeatRequestProto req) throws ServiceException {
+ try {
+ impl.heartbeat(convert(req.getReqInfo()));
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return HeartbeatResponseProto.getDefaultInstance();
+ }
+
+ /** @see JournalProtocol#startLogSegment */
+ @Override
+ public StartLogSegmentResponseProto startLogSegment(RpcController controller,
+ StartLogSegmentRequestProto req) throws ServiceException {
+ try {
+ impl.startLogSegment(convert(req.getReqInfo()),
+ req.getTxid());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return StartLogSegmentResponseProto.newBuilder().build();
+ }
+
+ @Override
+ public FinalizeLogSegmentResponseProto finalizeLogSegment(
+ RpcController controller, FinalizeLogSegmentRequestProto req)
+ throws ServiceException {
+ try {
+ impl.finalizeLogSegment(convert(req.getReqInfo()),
+ req.getStartTxId(), req.getEndTxId());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return FinalizeLogSegmentResponseProto.newBuilder().build();
+ }
+
+ @Override
+ public PurgeLogsResponseProto purgeLogs(RpcController controller,
+ PurgeLogsRequestProto req) throws ServiceException {
+ try {
+ impl.purgeLogsOlderThan(convert(req.getReqInfo()),
+ req.getMinTxIdToKeep());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return PurgeLogsResponseProto.getDefaultInstance();
+ }
+
+ @Override
+ public GetEditLogManifestResponseProto getEditLogManifest(
+ RpcController controller, GetEditLogManifestRequestProto request)
+ throws ServiceException {
+ try {
+ return impl.getEditLogManifest(
+ request.getJid().getIdentifier(),
+ request.getSinceTxId());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+
+ @Override
+ public PrepareRecoveryResponseProto prepareRecovery(RpcController controller,
+ PrepareRecoveryRequestProto request) throws ServiceException {
+ try {
+ return impl.prepareRecovery(convert(request.getReqInfo()),
+ request.getSegmentTxId());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public AcceptRecoveryResponseProto acceptRecovery(RpcController controller,
+ AcceptRecoveryRequestProto request) throws ServiceException {
+ try {
+ impl.acceptRecovery(convert(request.getReqInfo()),
+ request.getStateToAccept(),
+ new URL(request.getFromURL()));
+ return AcceptRecoveryResponseProto.getDefaultInstance();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+
+ private RequestInfo convert(
+ QJournalProtocolProtos.RequestInfoProto reqInfo) {
+ return new RequestInfo(
+ reqInfo.getJournalId().getIdentifier(),
+ reqInfo.getEpoch(),
+ reqInfo.getIpcSerialNumber(),
+ reqInfo.hasCommittedTxId() ?
+ reqInfo.getCommittedTxId() : HdfsConstants.INVALID_TXID);
+ }
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.IsFormattedRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.IsFormattedResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.RequestInfoProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link JournalProtocol} interfaces to the RPC server implementing
+ * {@link JournalProtocolPB}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
+ QJournalProtocol, Closeable {
+ /** RpcController is not used and hence is set to null */
+ private final static RpcController NULL_CONTROLLER = null;
+ private final QJournalProtocolPB rpcProxy;
+
+ public QJournalProtocolTranslatorPB(QJournalProtocolPB rpcProxy) {
+ this.rpcProxy = rpcProxy;
+ }
+
+ @Override
+ public void close() {
+ RPC.stopProxy(rpcProxy);
+ }
+
+
+ @Override
+ public boolean isFormatted(String journalId) throws IOException {
+ try {
+ IsFormattedRequestProto req = IsFormattedRequestProto.newBuilder()
+ .setJid(convertJournalId(journalId))
+ .build();
+ IsFormattedResponseProto resp = rpcProxy.isFormatted(
+ NULL_CONTROLLER, req);
+ return resp.getIsFormatted();
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public GetJournalStateResponseProto getJournalState(String jid)
+ throws IOException {
+ try {
+ GetJournalStateRequestProto req = GetJournalStateRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .build();
+ return rpcProxy.getJournalState(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ private JournalIdProto convertJournalId(String jid) {
+ return JournalIdProto.newBuilder()
+ .setIdentifier(jid)
+ .build();
+ }
+
+ @Override
+ public void format(String jid, NamespaceInfo nsInfo) throws IOException {
+ try {
+ FormatRequestProto req = FormatRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setNsInfo(PBHelper.convert(nsInfo))
+ .build();
+ rpcProxy.format(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public NewEpochResponseProto newEpoch(String jid, NamespaceInfo nsInfo,
+ long epoch) throws IOException {
+ try {
+ NewEpochRequestProto req = NewEpochRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setNsInfo(PBHelper.convert(nsInfo))
+ .setEpoch(epoch)
+ .build();
+ return rpcProxy.newEpoch(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void journal(RequestInfo reqInfo,
+ long segmentTxId, long firstTxnId, int numTxns,
+ byte[] records) throws IOException {
+ JournalRequestProto req = JournalRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .setSegmentTxnId(segmentTxId)
+ .setFirstTxnId(firstTxnId)
+ .setNumTxns(numTxns)
+ .setRecords(PBHelper.getByteString(records))
+ .build();
+ try {
+ rpcProxy.journal(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void heartbeat(RequestInfo reqInfo) throws IOException {
+ try {
+ rpcProxy.heartbeat(NULL_CONTROLLER, HeartbeatRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ private QJournalProtocolProtos.RequestInfoProto convert(
+ RequestInfo reqInfo) {
+ RequestInfoProto.Builder builder = RequestInfoProto.newBuilder()
+ .setJournalId(convertJournalId(reqInfo.getJournalId()))
+ .setEpoch(reqInfo.getEpoch())
+ .setIpcSerialNumber(reqInfo.getIpcSerialNumber());
+ if (reqInfo.hasCommittedTxId()) {
+ builder.setCommittedTxId(reqInfo.getCommittedTxId());
+ }
+ return builder.build();
+ }
+
+ @Override
+ public void startLogSegment(RequestInfo reqInfo, long txid)
+ throws IOException {
+ StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .setTxid(txid)
+ .build();
+ try {
+ rpcProxy.startLogSegment(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
+ long endTxId) throws IOException {
+ FinalizeLogSegmentRequestProto req =
+ FinalizeLogSegmentRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .setStartTxId(startTxId)
+ .setEndTxId(endTxId)
+ .build();
+ try {
+ rpcProxy.finalizeLogSegment(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
+ throws IOException {
+ PurgeLogsRequestProto req = PurgeLogsRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .setMinTxIdToKeep(minTxIdToKeep)
+ .build();
+ try {
+ rpcProxy.purgeLogs(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public GetEditLogManifestResponseProto getEditLogManifest(String jid,
+ long sinceTxId) throws IOException {
+ try {
+ return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
+ GetEditLogManifestRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setSinceTxId(sinceTxId)
+ .build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
+ long segmentTxId) throws IOException {
+ try {
+ return rpcProxy.prepareRecovery(NULL_CONTROLLER,
+ PrepareRecoveryRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .setSegmentTxId(segmentTxId)
+ .build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void acceptRecovery(RequestInfo reqInfo,
+ SegmentStateProto stateToAccept, URL fromUrl) throws IOException {
+ try {
+ rpcProxy.acceptRecovery(NULL_CONTROLLER,
+ AcceptRecoveryRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .setStateToAccept(stateToAccept)
+ .setFromURL(fromUrl.toExternalForm())
+ .build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ public boolean isMethodSupported(String methodName) throws IOException {
+ return RpcClientUtil.isMethodSupported(rpcProxy,
+ QJournalProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(QJournalProtocolPB.class), methodName);
+ }
+
+}