You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/02/27 00:32:14 UTC
svn commit: r1293964 [4/11] - in
/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs:
./ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/...
Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryResponseProto;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link InterDatanodeProtocolPB} to the
+ * {@link InterDatanodeProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public class InterDatanodeProtocolServerSideTranslatorPB implements
+ InterDatanodeProtocolPB {
+ private final InterDatanodeProtocol impl;
+
+ public InterDatanodeProtocolServerSideTranslatorPB(InterDatanodeProtocol impl) {
+ this.impl = impl;
+ }
+
+ @Override
+ public InitReplicaRecoveryResponseProto initReplicaRecovery(
+ RpcController unused, InitReplicaRecoveryRequestProto request)
+ throws ServiceException {
+ RecoveringBlock b = PBHelper.convert(request.getBlock());
+ ReplicaRecoveryInfo r;
+ try {
+ r = impl.initReplicaRecovery(b);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return InitReplicaRecoveryResponseProto.newBuilder()
+ .setBlock(PBHelper.convert(r))
+ .setState(PBHelper.convert(r.getOriginalReplicaState())).build();
+ }
+
+ @Override
+ public UpdateReplicaUnderRecoveryResponseProto updateReplicaUnderRecovery(
+ RpcController unused, UpdateReplicaUnderRecoveryRequestProto request)
+ throws ServiceException {
+ ExtendedBlock b;
+ try {
+ b = impl.updateReplicaUnderRecovery(PBHelper.convert(request.getBlock()),
+ request.getRecoveryId(), request.getNewLength());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return UpdateReplicaUnderRecoveryResponseProto.newBuilder()
+ .setBlock(PBHelper.convert(b)).build();
+ }
+
+ /** @see VersionedProtocol#getProtocolVersion */
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return RPC.getProtocolVersion(InterDatanodeProtocolPB.class);
+ }
+
+ /**
+ * The client side will redirect getProtocolSignature to
+ * getProtocolSignature2.
+ *
+ * However the RPC layer below on the Server side will call getProtocolVersion
+ * and possibly in the future getProtocolSignature. Hence we still implement
+ * it even though the end client will never call this method.
+ *
+ * @see VersionedProtocol#getProtocolVersion
+ */
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ /**
+ * Don't forward this to the server. The protocol version and signature is
+ * that of {@link InterDatanodeProtocol}
+ */
+ if (!protocol.equals(RPC.getProtocolName(InterDatanodeProtocol.class))) {
+ throw new IOException("Namenode Serverside implements " +
+ RPC.getProtocolName(InterDatanodeProtocol.class) +
+ ". The following requested protocol is unknown: " + protocol);
+ }
+
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ RPC.getProtocolVersion(InterDatanodeProtocolPB.class),
+ InterDatanodeProtocolPB.class);
+ }
+
+
+ @Override
+ public ProtocolSignatureWritable getProtocolSignature2(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ /**
+ * Don't forward this to the server. The protocol version and signature is
+ * that of {@link InterDatanodeProtocol}
+ */
+ return ProtocolSignatureWritable.convert(
+ this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryRequestProto;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link InterDatanodeProtocol} interfaces to the RPC server implementing
+ * {@link InterDatanodeProtocolPB}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class InterDatanodeProtocolTranslatorPB implements
+ ProtocolMetaInterface, InterDatanodeProtocol, Closeable {
+ /** RpcController is not used and hence is set to null */
+ private final static RpcController NULL_CONTROLLER = null;
+ final private InterDatanodeProtocolPB rpcProxy;
+
+ public InterDatanodeProtocolTranslatorPB(InetSocketAddress addr,
+ UserGroupInformation ugi, Configuration conf, SocketFactory factory,
+ int socketTimeout)
+ throws IOException {
+ RPC.setProtocolEngine(conf, InterDatanodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+ rpcProxy = RPC.getProxy(InterDatanodeProtocolPB.class,
+ RPC.getProtocolVersion(InterDatanodeProtocolPB.class), addr, ugi, conf,
+ factory, socketTimeout);
+ }
+
+ @Override
+ public void close() {
+ RPC.stopProxy(rpcProxy);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocolName, long clientVersion)
+ throws IOException {
+ return rpcProxy.getProtocolVersion(protocolName, clientVersion);
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2(
+ protocol, clientVersion, clientMethodsHash));
+ }
+
+ @Override
+ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+ throws IOException {
+ InitReplicaRecoveryRequestProto req = InitReplicaRecoveryRequestProto
+ .newBuilder().setBlock(PBHelper.convert(rBlock)).build();
+ InitReplicaRecoveryResponseProto resp;
+ try {
+ resp = rpcProxy.initReplicaRecovery(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ BlockProto b = resp.getBlock();
+ return new ReplicaRecoveryInfo(b.getBlockId(), b.getNumBytes(),
+ b.getGenStamp(), PBHelper.convert(resp.getState()));
+ }
+
+ @Override
+ public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
+ long recoveryId, long newLength) throws IOException {
+ UpdateReplicaUnderRecoveryRequestProto req =
+ UpdateReplicaUnderRecoveryRequestProto.newBuilder()
+ .setBlock(PBHelper.convert(oldBlock))
+ .setNewLength(newLength).setRecoveryId(recoveryId).build();
+ try {
+ return PBHelper.convert(rpcProxy.updateReplicaUnderRecovery(
+ NULL_CONTROLLER, req).getBlock());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public boolean isMethodSupported(String methodName) throws IOException {
+ return RpcClientUtil.isMethodSupported(rpcProxy,
+ InterDatanodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(InterDatanodeProtocolPB.class), methodName);
+ }
+}
Copied: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolPB.java (from r1293950, hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolPB.java?p2=hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolPB.java&p1=hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java&r1=1293950&r2=1293964&rev=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolPB.java Sun Feb 26 23:32:06 2012
@@ -15,48 +15,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.protocol;
+package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.security.KerberosInfo;
/**
* Protocol used to journal edits to a remote node. Currently,
* this is used to publish edits from the NameNode to a BackupNode.
+ *
+ * Note: This extends the protocolbuffer service based interface to
+ * add annotations required for security.
*/
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
+@ProtocolInfo(protocolName =
+ "org.apache.hadoop.hdfs.server.protocol.JournalProtocol",
+ protocolVersion = 1)
@InterfaceAudience.Private
-public interface JournalProtocol extends VersionedProtocol {
- public static final long versionID = 1L;
-
- /**
- * Journal edit records.
- * This message is sent by the active name-node to the backup node
- * via {@code EditLogBackupOutputStream} in order to synchronize meta-data
- * changes with the backup namespace image.
- *
- * @param registration active node registration
- * @param firstTxnId the first transaction of this batch
- * @param numTxns number of transactions
- * @param records byte array containing serialized journal records
- */
- public void journal(NamenodeRegistration registration,
- long firstTxnId,
- int numTxns,
- byte[] records) throws IOException;
-
+public interface JournalProtocolPB extends
+ JournalProtocolService.BlockingInterface, VersionedProtocol {
/**
- * Notify the BackupNode that the NameNode has rolled its edit logs
- * and is now writing a new log segment.
- * @param registration the registration of the active NameNode
- * @param txid the first txid in the new log
+ * This method is defined to get the protocol signature using
+ * the R23 protocol - hence we have added the suffix of 2 the method name
+ * to avoid conflict.
*/
- public void startLogSegment(NamenodeRegistration registration,
- long txid) throws IOException;
+ public ProtocolSignatureWritable getProtocolSignature2(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException;
}
Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link JournalProtocolPB} to the
+ * {@link JournalProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB {
+ /** Server side implementation to delegate the requests to */
+ private final JournalProtocol impl;
+
+ public JournalProtocolServerSideTranslatorPB(JournalProtocol impl) {
+ this.impl = impl;
+ }
+
+ /** @see JournalProtocol#journal */
+ @Override
+ public JournalResponseProto journal(RpcController unused,
+ JournalRequestProto req) throws ServiceException {
+ try {
+ impl.journal(PBHelper.convert(req.getRegistration()),
+ req.getFirstTxnId(), req.getNumTxns(), req.getRecords()
+ .toByteArray());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return JournalResponseProto.newBuilder().build();
+ }
+
+ /** @see JournalProtocol#startLogSegment */
+ @Override
+ public StartLogSegmentResponseProto startLogSegment(RpcController controller,
+ StartLogSegmentRequestProto req) throws ServiceException {
+ try {
+ impl.startLogSegment(PBHelper.convert(req.getRegistration()),
+ req.getTxid());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return StartLogSegmentResponseProto.newBuilder().build();
+ }
+
+ /** @see VersionedProtocol#getProtocolVersion */
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return RPC.getProtocolVersion(JournalProtocolPB.class);
+ }
+
+ /**
+ * The client side will redirect getProtocolSignature to
+ * getProtocolSignature2.
+ *
+ * However the RPC layer below on the Server side will call getProtocolVersion
+ * and possibly in the future getProtocolSignature. Hence we still implement
+ * it even though the end client will never call this method.
+ *
+ * @see VersionedProtocol#getProtocolSignature(String, long, int)
+ */
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ /**
+ * Don't forward this to the server. The protocol version and signature is
+ * that of {@link JournalProtocol}
+ */
+ if (!protocol.equals(RPC.getProtocolName(JournalProtocolPB.class))) {
+ throw new IOException("Namenode Serverside implements " +
+ RPC.getProtocolName(JournalProtocolPB.class) +
+ ". The following requested protocol is unknown: " + protocol);
+ }
+
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ RPC.getProtocolVersion(JournalProtocolPB.class),
+ JournalProtocolPB.class);
+ }
+
+
+ @Override
+ public ProtocolSignatureWritable getProtocolSignature2(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ /**
+ * Don't forward this to the server. The protocol version and signature is
+ * that of {@link JournalPBProtocol}
+ */
+ return ProtocolSignatureWritable.convert(
+ this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
+ }
+}
Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link JournalProtocol} interfaces to the RPC server implementing
+ * {@link JournalProtocolPB}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class JournalProtocolTranslatorPB implements ProtocolMetaInterface,
+ JournalProtocol, Closeable {
+ /** RpcController is not used and hence is set to null */
+ private final static RpcController NULL_CONTROLLER = null;
+ private final JournalProtocolPB rpcProxy;
+
+ public JournalProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
+ Configuration conf) throws IOException {
+ RPC.setProtocolEngine(conf, JournalProtocolPB.class, ProtobufRpcEngine.class);
+ rpcProxy = RPC.getProxy(JournalProtocolPB.class,
+ RPC.getProtocolVersion(JournalProtocolPB.class), nameNodeAddr, conf);
+ }
+
+ @Override
+ public void close() {
+ RPC.stopProxy(rpcProxy);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocolName, long clientVersion)
+ throws IOException {
+ return rpcProxy.getProtocolVersion(protocolName, clientVersion);
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2(
+ protocol, clientVersion, clientMethodsHash));
+ }
+
+ @Override
+ public void journal(NamenodeRegistration reg, long firstTxnId,
+ int numTxns, byte[] records) throws IOException {
+ JournalRequestProto req = JournalRequestProto.newBuilder()
+ .setRegistration(PBHelper.convert(reg))
+ .setFirstTxnId(firstTxnId)
+ .setNumTxns(numTxns)
+ .setRecords(PBHelper.getByteString(records))
+ .build();
+ try {
+ rpcProxy.journal(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void startLogSegment(NamenodeRegistration registration, long txid)
+ throws IOException {
+ StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
+ .setRegistration(PBHelper.convert(registration))
+ .setTxid(txid)
+ .build();
+ try {
+ rpcProxy.startLogSegment(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public boolean isMethodSupported(String methodName) throws IOException {
+ return RpcClientUtil.isMethodSupported(rpcProxy, JournalProtocolPB.class,
+ RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(JournalProtocolPB.class), methodName);
+ }
+}
Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolPB.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolPB.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolPB.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol that a secondary NameNode uses to communicate with the NameNode.
+ * It's used to get part of the name node state
+ *
+ * Note: This extends the protocolbuffer service based interface to
+ * add annotations required for security.
+ */
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
+ clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
+@ProtocolInfo(protocolName =
+ "org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol",
+ protocolVersion = 1)
+@InterfaceAudience.Private
+public interface NamenodeProtocolPB extends
+ NamenodeProtocolService.BlockingInterface, VersionedProtocol {
+ /**
+ * This method is defined to get the protocol signature using
+ * the R23 protocol - hence we have added the suffix of 2 the method name
+ * to avoid conflict.
+ */
+ public ProtocolSignatureWritable getProtocolSignature2(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException;
+}
Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointResponseProto;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link NamenodeProtocolPB} to the
+ * {@link NamenodeProtocol} server implementation.
+ */
+public class NamenodeProtocolServerSideTranslatorPB implements
+ NamenodeProtocolPB {
+ private final NamenodeProtocol impl;
+
+ public NamenodeProtocolServerSideTranslatorPB(NamenodeProtocol impl) {
+ this.impl = impl;
+ }
+
+ @Override
+ public GetBlocksResponseProto getBlocks(RpcController unused,
+ GetBlocksRequestProto request) throws ServiceException {
+ DatanodeInfo dnInfo = new DatanodeInfo(PBHelper.convert(request
+ .getDatanode()));
+ BlocksWithLocations blocks;
+ try {
+ blocks = impl.getBlocks(dnInfo, request.getSize());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return GetBlocksResponseProto.newBuilder()
+ .setBlocks(PBHelper.convert(blocks)).build();
+ }
+
+ @Override
+ public GetBlockKeysResponseProto getBlockKeys(RpcController unused,
+ GetBlockKeysRequestProto request) throws ServiceException {
+ ExportedBlockKeys keys;
+ try {
+ keys = impl.getBlockKeys();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return GetBlockKeysResponseProto.newBuilder()
+ .setKeys(PBHelper.convert(keys)).build();
+ }
+
+ @Override
+ public GetTransactionIdResponseProto getTransactionId(RpcController unused,
+ GetTransactionIdRequestProto request) throws ServiceException {
+ long txid;
+ try {
+ txid = impl.getTransactionID();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return GetTransactionIdResponseProto.newBuilder().setTxId(txid).build();
+ }
+
+ @Override
+ public RollEditLogResponseProto rollEditLog(RpcController unused,
+ RollEditLogRequestProto request) throws ServiceException {
+ CheckpointSignature signature;
+ try {
+ signature = impl.rollEditLog();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return RollEditLogResponseProto.newBuilder()
+ .setSignature(PBHelper.convert(signature)).build();
+ }
+
+ @Override
+ public ErrorReportResponseProto errorReport(RpcController unused,
+ ErrorReportRequestProto request) throws ServiceException {
+ try {
+ impl.errorReport(PBHelper.convert(request.getRegistration()),
+ request.getErrorCode(), request.getMsg());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return ErrorReportResponseProto.newBuilder().build();
+ }
+
+ @Override
+ public RegisterResponseProto register(RpcController unused,
+ RegisterRequestProto request) throws ServiceException {
+ NamenodeRegistration reg;
+ try {
+ reg = impl.register(PBHelper.convert(request.getRegistration()));
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return RegisterResponseProto.newBuilder()
+ .setRegistration(PBHelper.convert(reg)).build();
+ }
+
+ @Override
+ public StartCheckpointResponseProto startCheckpoint(RpcController unused,
+ StartCheckpointRequestProto request) throws ServiceException {
+ NamenodeCommand cmd;
+ try {
+ cmd = impl.startCheckpoint(PBHelper.convert(request.getRegistration()));
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return StartCheckpointResponseProto.newBuilder()
+ .setCommand(PBHelper.convert(cmd)).build();
+ }
+
+ @Override
+ public EndCheckpointResponseProto endCheckpoint(RpcController unused,
+ EndCheckpointRequestProto request) throws ServiceException {
+ try {
+ impl.endCheckpoint(PBHelper.convert(request.getRegistration()),
+ PBHelper.convert(request.getSignature()));
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return EndCheckpointResponseProto.newBuilder().build();
+ }
+
+ @Override
+ public GetEditLogManifestResponseProto getEditLogManifest(
+ RpcController unused, GetEditLogManifestRequestProto request)
+ throws ServiceException {
+ RemoteEditLogManifest manifest;
+ try {
+ manifest = impl.getEditLogManifest(request.getSinceTxId());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return GetEditLogManifestResponseProto.newBuilder()
+ .setManifest(PBHelper.convert(manifest)).build();
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return RPC.getProtocolVersion(NamenodeProtocolPB.class);
+ }
+
+ /**
+ * The client side will redirect getProtocolSignature to
+ * getProtocolSignature2.
+ *
+ * However the RPC layer below on the Server side will call getProtocolVersion
+ * and possibly in the future getProtocolSignature. Hence we still implement
+ * it even though the end client will never call this method.
+ */
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ /**
+ * Don't forward this to the server. The protocol version and signature is
+ * that of {@link NamenodeProtocol}
+ */
+ if (!protocol.equals(RPC.getProtocolName(NamenodeProtocolPB.class))) {
+ throw new IOException("Namenode Serverside implements " +
+ RPC.getProtocolName(NamenodeProtocolPB.class) +
+ ". The following requested protocol is unknown: " + protocol);
+ }
+
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ RPC.getProtocolVersion(NamenodeProtocolPB.class),
+ NamenodeProtocolPB.class);
+ }
+
+
+ @Override
+ public ProtocolSignatureWritable getProtocolSignature2(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ /**
+ * Don't forward this to the server. The protocol version and signature is
+ * that of {@link NamenodePBProtocol}
+ */
+ return ProtocolSignatureWritable.convert(
+ this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
+ }
+
+ @Override
+ public VersionResponseProto versionRequest(RpcController controller,
+ VersionRequestProto request) throws ServiceException {
+ NamespaceInfo info;
+ try {
+ info = impl.versionRequest();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return VersionResponseProto.newBuilder()
+ .setInfo(PBHelper.convert(info)).build();
+ }
+}
Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link NamenodeProtocol} interfaces to the RPC server implementing
+ * {@link NamenodeProtocolPB}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
+ ProtocolMetaInterface, Closeable {
+ /** RpcController is not used and hence is set to null */
+ private final static RpcController NULL_CONTROLLER = null;
+
+ /*
+ * Protobuf requests with no parameters instantiated only once
+ */
+ private static final GetBlockKeysRequestProto GET_BLOCKKEYS =
+ GetBlockKeysRequestProto.newBuilder().build();
+ private static final GetTransactionIdRequestProto GET_TRANSACTIONID =
+ GetTransactionIdRequestProto.newBuilder().build();
+ private static final RollEditLogRequestProto ROLL_EDITLOG =
+ RollEditLogRequestProto.newBuilder().build();
+ private static final VersionRequestProto VERSION_REQUEST =
+ VersionRequestProto.newBuilder().build();
+
+ final private NamenodeProtocolPB rpcProxy;
+
+ public NamenodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
+ Configuration conf, UserGroupInformation ugi) throws IOException {
+ RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+ rpcProxy = RPC.getProxy(NamenodeProtocolPB.class,
+ RPC.getProtocolVersion(NamenodeProtocolPB.class), nameNodeAddr, ugi,
+ conf, NetUtils.getSocketFactory(conf, NamenodeProtocolPB.class));
+ }
+
+ public NamenodeProtocolTranslatorPB(NamenodeProtocolPB rpcProxy) {
+ this.rpcProxy = rpcProxy;
+ }
+
+ public void close() {
+ RPC.stopProxy(rpcProxy);
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocolName,
+ long clientVersion, int clientMethodHash) throws IOException {
+ return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2(
+ protocolName, clientVersion, clientMethodHash));
+ }
+
+ @Override
+ public long getProtocolVersion(String protocolName, long clientVersion)
+ throws IOException {
+ return rpcProxy.getProtocolVersion(protocolName, clientVersion);
+ }
+
+ @Override
+ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+ throws IOException {
+ GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
+ .setDatanode(PBHelper.convert((DatanodeID)datanode)).setSize(size)
+ .build();
+ try {
+ return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req)
+ .getBlocks());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public ExportedBlockKeys getBlockKeys() throws IOException {
+ try {
+ return PBHelper.convert(rpcProxy.getBlockKeys(NULL_CONTROLLER,
+ GET_BLOCKKEYS).getKeys());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public long getTransactionID() throws IOException {
+ try {
+ return rpcProxy.getTransactionId(NULL_CONTROLLER, GET_TRANSACTIONID)
+ .getTxId();
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public CheckpointSignature rollEditLog() throws IOException {
+ try {
+ return PBHelper.convert(rpcProxy.rollEditLog(NULL_CONTROLLER,
+ ROLL_EDITLOG).getSignature());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public NamespaceInfo versionRequest() throws IOException {
+ try {
+ return PBHelper.convert(rpcProxy.versionRequest(NULL_CONTROLLER,
+ VERSION_REQUEST).getInfo());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void errorReport(NamenodeRegistration registration, int errorCode,
+ String msg) throws IOException {
+ ErrorReportRequestProto req = ErrorReportRequestProto.newBuilder()
+ .setErrorCode(errorCode).setMsg(msg)
+ .setRegistration(PBHelper.convert(registration)).build();
+ try {
+ rpcProxy.errorReport(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public NamenodeRegistration register(NamenodeRegistration registration)
+ throws IOException {
+ RegisterRequestProto req = RegisterRequestProto.newBuilder()
+ .setRegistration(PBHelper.convert(registration)).build();
+ try {
+ return PBHelper.convert(rpcProxy.register(NULL_CONTROLLER, req)
+ .getRegistration());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
+ throws IOException {
+ StartCheckpointRequestProto req = StartCheckpointRequestProto.newBuilder()
+ .setRegistration(PBHelper.convert(registration)).build();
+ NamenodeCommandProto cmd;
+ try {
+ cmd = rpcProxy.startCheckpoint(NULL_CONTROLLER, req).getCommand();
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ return PBHelper.convert(cmd);
+ }
+
+ @Override
+ public void endCheckpoint(NamenodeRegistration registration,
+ CheckpointSignature sig) throws IOException {
+ EndCheckpointRequestProto req = EndCheckpointRequestProto.newBuilder()
+ .setRegistration(PBHelper.convert(registration))
+ .setSignature(PBHelper.convert(sig)).build();
+ try {
+ rpcProxy.endCheckpoint(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+ throws IOException {
+ GetEditLogManifestRequestProto req = GetEditLogManifestRequestProto
+ .newBuilder().setSinceTxId(sinceTxId).build();
+ try {
+ return PBHelper.convert(rpcProxy.getEditLogManifest(NULL_CONTROLLER, req)
+ .getManifest());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public boolean isMethodSupported(String methodName) throws IOException {
+ return RpcClientUtil.isMethodSupported(rpcProxy, NamenodeProtocolPB.class,
+ RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(NamenodeProtocolPB.class), methodName);
+ }
+}