You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crail.apache.org by pe...@apache.org on 2021/02/16 14:42:29 UTC

[incubator-crail] branch master updated (8a739dd -> 9de7d4d)

This is an automated email from the ASF dual-hosted git repository.

pepperjo pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-crail.git.


    from 8a739dd  [maven-release-plugin] prepare for next development iteration
     new 7c4557d  implement RPC to remove datanodes
     new 9de7d4d  minor changes

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/crail/metadata/DataNodeStatistics.java  | 16 +++++-
 .../org/apache/crail/metadata/DataNodeStatus.java  | 32 ++++++++++++
 .../java/org/apache/crail/rpc/RpcConnection.java   |  4 ++
 .../java/org/apache/crail/rpc/RpcDispatcher.java   | 10 ++++
 .../main/java/org/apache/crail/rpc/RpcErrors.java  |  2 +-
 .../org/apache/crail/rpc/RpcRemoveDataNode.java    |  5 ++
 .../rpc/darpc/DaRPCNameNodeConnection.java         | 19 +++++++
 .../namenode/rpc/darpc/DaRPCNameNodeRequest.java   | 19 ++++++-
 .../namenode/rpc/darpc/DaRPCNameNodeResponse.java  | 27 +++++++++-
 .../crail/namenode/rpc/tcp/TcpNameNodeRequest.java | 23 +++++++--
 .../namenode/rpc/tcp/TcpNameNodeResponse.java      | 26 ++++++++--
 .../crail/namenode/rpc/tcp/TcpRpcConnection.java   | 12 +++++
 .../java/org/apache/crail/rpc/RpcProtocol.java     |  7 ++-
 .../org/apache/crail/rpc/RpcRequestMessage.java    | 60 +++++++++++++++++++++-
 .../org/apache/crail/rpc/RpcResponseMessage.java   | 49 ++++++++++++++++++
 15 files changed, 296 insertions(+), 15 deletions(-)
 create mode 100644 client/src/main/java/org/apache/crail/metadata/DataNodeStatus.java
 create mode 100644 client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java


[incubator-crail] 02/02: minor changes

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pepperjo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-crail.git

commit 9de7d4db3eac405b963b92e28d229b3ecfd3c89e
Author: Malte Brodmann <mb...@student.ethz.ch>
AuthorDate: Tue Feb 16 13:27:27 2021 +0100

    minor changes
---
 .../apache/crail/metadata/DataNodeStatistics.java  | 16 +++++------
 .../org/apache/crail/metadata/DataNodeStatus.java  | 32 ++++++++++++++++++++++
 .../java/org/apache/crail/rpc/RpcDispatcher.java   |  3 ++
 .../main/java/org/apache/crail/rpc/RpcErrors.java  |  1 -
 .../org/apache/crail/rpc/RpcRemoveDataNode.java    |  2 +-
 .../namenode/rpc/darpc/DaRPCNameNodeRequest.java   |  4 +--
 .../namenode/rpc/darpc/DaRPCNameNodeResponse.java  |  4 +--
 .../crail/namenode/rpc/tcp/TcpNameNodeRequest.java |  4 +--
 .../namenode/rpc/tcp/TcpNameNodeResponse.java      |  6 ++--
 .../org/apache/crail/rpc/RpcRequestMessage.java    | 10 +++----
 .../org/apache/crail/rpc/RpcResponseMessage.java   | 20 +++++++-------
 11 files changed, 68 insertions(+), 34 deletions(-)

diff --git a/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java b/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java
index 00eebd9..f697a32 100644
--- a/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java
+++ b/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java
@@ -22,29 +22,29 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
 public class DataNodeStatistics {
-	public static final int CSIZE = 14;
-	
+	public static int CSIZE = DataNodeStatus.CSIZE + 12;
+
+	private DataNodeStatus status;
 	private long serviceId;
 	private int freeBlockCount;
-	private short status;
 	
 	public DataNodeStatistics(){
+		this.status = new DataNodeStatus();
 		this.serviceId = 0;
 		this.freeBlockCount = 0;
-		this.status = 0;
 	}
 	
 	public int write(ByteBuffer buffer){
+		this.status.write(buffer);
 		buffer.putLong(serviceId);
 		buffer.putInt(freeBlockCount);
-		buffer.putShort(status);
 		return CSIZE;
 	}
 	
 	public void update(ByteBuffer buffer) throws UnknownHostException {
+		this.status.update(buffer);
 		this.serviceId = buffer.getLong();
 		this.freeBlockCount = buffer.getInt();
-		this.status = buffer.getShort();
 	}
 
 	public int getFreeBlockCount() {
@@ -55,12 +55,12 @@ public class DataNodeStatistics {
 		this.freeBlockCount = blockCount;
 	}
 
-	public short getStatus() {
+	public DataNodeStatus getStatus() {
 		return this.status;
 	}
 
 	public void setStatus(short status) {
-		this.status = status;
+		this.status.setStatus(status);
 	}
 
 	public void setStatistics(DataNodeStatistics statistics) {
diff --git a/client/src/main/java/org/apache/crail/metadata/DataNodeStatus.java b/client/src/main/java/org/apache/crail/metadata/DataNodeStatus.java
new file mode 100644
index 0000000..2938e1f
--- /dev/null
+++ b/client/src/main/java/org/apache/crail/metadata/DataNodeStatus.java
@@ -0,0 +1,32 @@
+package org.apache.crail.metadata;
+
+import java.nio.ByteBuffer;
+
+public class DataNodeStatus {
+    public static final int CSIZE = 2;
+
+    private short status;
+
+    public static final short STATUS_DATANODE_STOP = 1;
+
+    public DataNodeStatus() {
+        this.status = 0;
+    }
+
+    public int write(ByteBuffer buffer) {
+        buffer.putShort(status);
+        return CSIZE;
+    }
+
+    public void update(ByteBuffer buffer) {
+        this.status = buffer.getShort();
+    }
+
+    public short getStatus() {
+        return this.status;
+    }
+
+    public void setStatus(short status) {
+        this.status = status;
+    }
+}
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java b/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java
index 605193c..4ecbdf2 100644
--- a/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java
+++ b/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java
@@ -138,6 +138,9 @@ public class RpcDispatcher implements RpcConnection {
 	@Override
 	public RpcFuture<RpcRemoveDataNode> removeDataNode(
 			InetAddress ipaddr, int port) throws Exception {
+		if(connections.length > 1) {
+			throw new Exception("removeDataNode RPC currently not supported in multi-namenode environments");
+		}
 		return connections[0].removeDataNode(ipaddr, port);
 	}
 
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcErrors.java b/client/src/main/java/org/apache/crail/rpc/RpcErrors.java
index 199f130..a223e05 100644
--- a/client/src/main/java/org/apache/crail/rpc/RpcErrors.java
+++ b/client/src/main/java/org/apache/crail/rpc/RpcErrors.java
@@ -55,7 +55,6 @@ public class RpcErrors {
 	public static short ERR_DIR_LOCATION_AFFINITY_MISMATCH = 26;
 	public static short ERR_ADD_BLOCK_FAILED = 27;
 	public static short ERR_CREATE_FILE_BUG = 28;
-	public static short ERR_DATANODE_STOP = 29;
 
 	static {
 		messages[ERR_OK] = "ERROR: No error, all fine";
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java b/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java
index ad3db57..8e70a5d 100644
--- a/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java
+++ b/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java
@@ -1,5 +1,5 @@
 package org.apache.crail.rpc;
 
 public interface RpcRemoveDataNode extends RpcResponse {
-    public short getData();
+    public short getRpcStatus();
 }
diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
index f496edf..d362441 100644
--- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
+++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
@@ -270,11 +270,11 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
 		return this.dumpNameNodeReq;
 	}
 	
-	public RpcRequestMessage.PingNameNodeReq pingNameNode(){
+	public RpcRequestMessage.PingNameNodeReq pingNameNode() {
 		return this.pingNameNodeReq;
 	}
 
-	public RpcRequestMessage.RemoveDataNodeReq removeDataNode(){
+	public RpcRequestMessage.RemoveDataNodeReq removeDataNode() {
 		return this.removeDataNodeReq;
 	}
 }
diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
index bd1d84b..1a34586 100644
--- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
+++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
@@ -301,11 +301,11 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState {
 		return getDataNodeRes;
 	}	
 	
-	public RpcResponseMessage.PingNameNodeRes pingNameNode(){
+	public RpcResponseMessage.PingNameNodeRes pingNameNode() {
 		return this.pingNameNodeRes;
 	}
 
-	public RpcResponseMessage.RemoveDataNodeRes removeDataNode(){
+	public RpcResponseMessage.RemoveDataNodeRes removeDataNode() {
 		return this.removeDataNodeRes;
 	}
 }
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
index 37e5503..b9f141a 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
@@ -270,11 +270,11 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag
 		return this.dumpNameNodeReq;
 	}
 	
-	public RpcRequestMessage.PingNameNodeReq pingNameNode(){
+	public RpcRequestMessage.PingNameNodeReq pingNameNode() {
 		return this.pingNameNodeReq;
 	}
 
-	public RpcRequestMessage.RemoveDataNodeReq removeDataNode(){
+	public RpcRequestMessage.RemoveDataNodeReq removeDataNode() {
 		return this.removeDataNodeReq;
 	}
 }
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
index 8a01a19..9c76230 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
@@ -250,12 +250,12 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo
 	public RpcResponseMessage.GetDataNodeRes getDataNode() {
 		return getDataNodeRes;
 	}	
-	
-	public RpcResponseMessage.PingNameNodeRes pingNameNode(){
+
+	public RpcResponseMessage.PingNameNodeRes pingNameNode() {
 		return this.pingNameNodeRes;
 	}
 
-	public RpcResponseMessage.RemoveDataNodeRes removeDataNode(){
+	public RpcResponseMessage.RemoveDataNodeRes removeDataNode() {
 		return this.removeDataNodeRes;
 	}
 }
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java b/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java
index 7d1f414..8055fb3 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java
@@ -579,12 +579,12 @@ public class RpcRequestMessage {
 		private InetAddress ipAddr;
 		private int port;
 
-		public RemoveDataNodeReq(){
+		public RemoveDataNodeReq() {
 			this.ipAddr = null;
 			this.port = 0;
 		}
 
-		public RemoveDataNodeReq(InetAddress addr, int port){
+		public RemoveDataNodeReq(InetAddress addr, int port) {
 			this.ipAddr = addr;
 			this.port = port;
 		}
@@ -594,7 +594,7 @@ public class RpcRequestMessage {
 			return 4 + Integer.BYTES;
 		}
 
-		public short getType(){
+		public short getType() {
 			return RpcProtocol.REQ_REMOVE_DATANODE;
 		}
 
@@ -622,11 +622,11 @@ public class RpcRequestMessage {
 			this.port = buffer.getInt();
 		}
 
-		public InetAddress getIPAddress(){
+		public InetAddress getIPAddress() {
 			return this.ipAddr;
 		}
 
-		public int port(){
+		public int port() {
 			return this.port;
 		}
 	}
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java b/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java
index 32697ea..37377f3 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java
@@ -660,11 +660,11 @@ public class RpcResponseMessage {
 	public static class RemoveDataNodeRes implements RpcProtocol.NameNodeRpcMessage, RpcRemoveDataNode {
 		public static int CSIZE = Short.BYTES;
 
-		private short data;
+		private short rpcStatus;
 		private short error;
 
 		public RemoveDataNodeRes() {
-			this.data = 0;
+			this.rpcStatus = 0;
 			this.error = 0;
 		}
 
@@ -672,28 +672,28 @@ public class RpcResponseMessage {
 			return CSIZE;
 		}
 
-		public short getType(){
+		public short getType() {
 			return RpcProtocol.RES_REMOVE_DATANODE;
 		}
 
 		public int write(ByteBuffer buffer) {
-			buffer.putShort(data);
+			buffer.putShort(rpcStatus);
 			return CSIZE;
 		}
 
 		public void update(ByteBuffer buffer) {
-			data = buffer.getShort();
+			rpcStatus = buffer.getShort();
 		}
 
-		public short getData(){
-			return data;
+		public short getRpcStatus() {
+			return rpcStatus;
 		}
 
-		public void setData(short data) {
-			this.data = data;
+		public void setRpcStatus(short rpcStatus) {
+			this.rpcStatus = rpcStatus;
 		}
 
-		public short getError(){
+		public short getError() {
 			return error;
 		}
 


[incubator-crail] 01/02: implement RPC to remove datanodes

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pepperjo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-crail.git

commit 7c4557df39a955dce9dde15762af95fee0eb6f7f
Author: Malte Brodmann <mb...@student.ethz.ch>
AuthorDate: Fri Feb 5 15:37:24 2021 +0100

    implement RPC to remove datanodes
---
 .../apache/crail/metadata/DataNodeStatistics.java  | 14 ++++-
 .../java/org/apache/crail/rpc/RpcConnection.java   |  4 ++
 .../java/org/apache/crail/rpc/RpcDispatcher.java   |  7 +++
 .../main/java/org/apache/crail/rpc/RpcErrors.java  |  3 +-
 .../org/apache/crail/rpc/RpcRemoveDataNode.java    |  5 ++
 .../rpc/darpc/DaRPCNameNodeConnection.java         | 19 +++++++
 .../namenode/rpc/darpc/DaRPCNameNodeRequest.java   | 17 ++++++
 .../namenode/rpc/darpc/DaRPCNameNodeResponse.java  | 25 ++++++++-
 .../crail/namenode/rpc/tcp/TcpNameNodeRequest.java | 21 +++++++-
 .../namenode/rpc/tcp/TcpNameNodeResponse.java      | 22 +++++++-
 .../crail/namenode/rpc/tcp/TcpRpcConnection.java   | 12 +++++
 .../java/org/apache/crail/rpc/RpcProtocol.java     |  7 ++-
 .../org/apache/crail/rpc/RpcRequestMessage.java    | 60 +++++++++++++++++++++-
 .../org/apache/crail/rpc/RpcResponseMessage.java   | 49 ++++++++++++++++++
 14 files changed, 256 insertions(+), 9 deletions(-)

diff --git a/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java b/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java
index fc52f14..00eebd9 100644
--- a/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java
+++ b/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java
@@ -22,25 +22,29 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
 public class DataNodeStatistics {
-	public static final int CSIZE = 12;
+	public static final int CSIZE = 14;
 	
 	private long serviceId;
 	private int freeBlockCount;
+	private short status;
 	
 	public DataNodeStatistics(){
 		this.serviceId = 0;
 		this.freeBlockCount = 0;
+		this.status = 0;
 	}
 	
 	public int write(ByteBuffer buffer){
 		buffer.putLong(serviceId);
 		buffer.putInt(freeBlockCount);
+		buffer.putShort(status);
 		return CSIZE;
 	}
 	
 	public void update(ByteBuffer buffer) throws UnknownHostException {
 		this.serviceId = buffer.getLong();
 		this.freeBlockCount = buffer.getInt();
+		this.status = buffer.getShort();
 	}
 
 	public int getFreeBlockCount() {
@@ -51,6 +55,14 @@ public class DataNodeStatistics {
 		this.freeBlockCount = blockCount;
 	}
 
+	public short getStatus() {
+		return this.status;
+	}
+
+	public void setStatus(short status) {
+		this.status = status;
+	}
+
 	public void setStatistics(DataNodeStatistics statistics) {
 		this.serviceId = statistics.getServiceId();
 		this.freeBlockCount = statistics.getFreeBlockCount();
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcConnection.java b/client/src/main/java/org/apache/crail/rpc/RpcConnection.java
index d434fdb..91ae152 100644
--- a/client/src/main/java/org/apache/crail/rpc/RpcConnection.java
+++ b/client/src/main/java/org/apache/crail/rpc/RpcConnection.java
@@ -19,6 +19,7 @@
 package org.apache.crail.rpc;
 
 import java.io.IOException;
+import java.net.InetAddress;
 
 import org.apache.crail.CrailNodeType;
 import org.apache.crail.metadata.BlockInfo;
@@ -59,6 +60,9 @@ public interface RpcConnection {
 
 	public abstract RpcFuture<RpcPing> pingNameNode()
 			throws Exception;
+
+	public abstract RpcFuture<RpcRemoveDataNode> removeDataNode(
+			InetAddress ipaddr, int port) throws Exception;
 	
 	public abstract void close() throws Exception;
 	
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java b/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java
index f8d38f5..605193c 100644
--- a/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java
+++ b/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java
@@ -19,6 +19,7 @@
 package org.apache.crail.rpc;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.crail.CrailNodeType;
@@ -135,6 +136,12 @@ public class RpcDispatcher implements RpcConnection {
 	}
 
 	@Override
+	public RpcFuture<RpcRemoveDataNode> removeDataNode(
+			InetAddress ipaddr, int port) throws Exception {
+		return connections[0].removeDataNode(ipaddr, port);
+	}
+
+	@Override
 	public void close() throws Exception {
 		for (RpcConnection connection : connections){
 			connection.close();
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcErrors.java b/client/src/main/java/org/apache/crail/rpc/RpcErrors.java
index 367e772..199f130 100644
--- a/client/src/main/java/org/apache/crail/rpc/RpcErrors.java
+++ b/client/src/main/java/org/apache/crail/rpc/RpcErrors.java
@@ -55,7 +55,8 @@ public class RpcErrors {
 	public static short ERR_DIR_LOCATION_AFFINITY_MISMATCH = 26;
 	public static short ERR_ADD_BLOCK_FAILED = 27;
 	public static short ERR_CREATE_FILE_BUG = 28;
-	
+	public static short ERR_DATANODE_STOP = 29;
+
 	static {
 		messages[ERR_OK] = "ERROR: No error, all fine";
 		messages[ERR_UNKNOWN] = "ERROR: Unknown error";
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java b/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java
new file mode 100644
index 0000000..ad3db57
--- /dev/null
+++ b/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java
@@ -0,0 +1,5 @@
+package org.apache.crail.rpc;
+
+public interface RpcRemoveDataNode extends RpcResponse {
+    public short getData();
+}
diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
index 26409f7..7b53855 100644
--- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
+++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
@@ -19,6 +19,7 @@
 package org.apache.crail.namenode.rpc.darpc;
 
 import java.io.IOException;
+import java.net.InetAddress;
 
 import org.apache.crail.CrailNodeType;
 import org.apache.crail.conf.CrailConstants;
@@ -35,6 +36,7 @@ import org.apache.crail.rpc.RpcGetDataNode;
 import org.apache.crail.rpc.RpcGetFile;
 import org.apache.crail.rpc.RpcGetLocation;
 import org.apache.crail.rpc.RpcPing;
+import org.apache.crail.rpc.RpcRemoveDataNode;
 import org.apache.crail.rpc.RpcProtocol;
 import org.apache.crail.rpc.RpcRenameFile;
 import org.apache.crail.rpc.RpcRequestMessage;
@@ -268,6 +270,23 @@ public class DaRPCNameNodeConnection implements RpcConnection {
 		
 		return nameNodeFuture;	
 	}
+
+	@Override
+	public RpcFuture<RpcRemoveDataNode> removeDataNode(InetAddress ipaddr, int port) throws Exception {
+
+		RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq = new RpcRequestMessage.RemoveDataNodeReq(ipaddr, port);
+		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(removeDataNodeReq);
+		request.setCommand(RpcProtocol.CMD_REMOVE_DATANODE);
+
+		RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes = new RpcResponseMessage.RemoveDataNodeRes();
+		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(removeDataNodeRes);
+		
+		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+		
+		DaRPCNameNodeFuture<RpcRemoveDataNode> nameNodeFuture = new DaRPCNameNodeFuture<RpcRemoveDataNode>(future, removeDataNodeRes);
+		
+		return nameNodeFuture;
+	}
 	
 	@Override
 	public void close() throws Exception {
diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
index f87ba9d..f496edf 100644
--- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
+++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
@@ -45,6 +45,7 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
 	private RpcRequestMessage.GetDataNodeReq getDataNodeReq;
 	private RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq;
 	private RpcRequestMessage.PingNameNodeReq pingNameNodeReq;
+	private RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq;
 
 	public DaRPCNameNodeRequest() {
 		this.cmd = 0;
@@ -60,6 +61,7 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
 		this.dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq();
 		this.pingNameNodeReq = new RpcRequestMessage.PingNameNodeReq();
 		this.getDataNodeReq = new RpcRequestMessage.GetDataNodeReq();
+		this.removeDataNodeReq = new RpcRequestMessage.RemoveDataNodeReq();
 	}
 	
 	public DaRPCNameNodeRequest(RpcRequestMessage.CreateFileReq message) {
@@ -115,6 +117,11 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
 		this.type = message.getType();
 		this.pingNameNodeReq = message;
 	}
+
+	public DaRPCNameNodeRequest(RpcRequestMessage.RemoveDataNodeReq message) {
+		this.type = message.getType();
+		this.removeDataNodeReq = message;
+	}
 	
 	public void setCommand(short command) {
 		this.cmd = command;
@@ -163,6 +170,9 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
 		case RpcProtocol.REQ_PING_NAMENODE:
 			written += pingNameNodeReq.write(buffer);
 			break;
+		case RpcProtocol.REQ_REMOVE_DATANODE:
+			written += removeDataNodeReq.write(buffer);
+			break;
 		}
 		
 		return written;
@@ -206,6 +216,9 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
 		case RpcProtocol.REQ_PING_NAMENODE:
 			pingNameNodeReq.update(buffer);
 			break;
+		case RpcProtocol.REQ_REMOVE_DATANODE:
+			removeDataNodeReq.update(buffer);
+			break;
 		}
 	}
 
@@ -260,4 +273,8 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
 	public RpcRequestMessage.PingNameNodeReq pingNameNode(){
 		return this.pingNameNodeReq;
 	}
+
+	public RpcRequestMessage.RemoveDataNodeReq removeDataNode(){
+		return this.removeDataNodeReq;
+	}
 }
diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
index a4e75f4..bd1d84b 100644
--- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
+++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
@@ -40,6 +40,7 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState {
 	private RpcResponseMessage.GetLocationRes getLocationRes;	
 	private RpcResponseMessage.GetDataNodeRes getDataNodeRes;
 	private RpcResponseMessage.PingNameNodeRes pingNameNodeRes;
+	private RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes;
 	
 	public DaRPCNameNodeResponse() {
 		this.type = 0;
@@ -54,6 +55,7 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState {
 		this.getLocationRes = new RpcResponseMessage.GetLocationRes();
 		this.getDataNodeRes = new RpcResponseMessage.GetDataNodeRes();
 		this.pingNameNodeRes = new RpcResponseMessage.PingNameNodeRes();
+		this.removeDataNodeRes = new RpcResponseMessage.RemoveDataNodeRes();
 	}
 	
 	public DaRPCNameNodeResponse(RpcResponseMessage.VoidRes message) {
@@ -100,6 +102,11 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState {
 		this.type = message.getType();
 		this.pingNameNodeRes = message;
 	}
+
+	public DaRPCNameNodeResponse(RpcResponseMessage.RemoveDataNodeRes message) {
+		this.type = message.getType();
+		this.removeDataNodeRes = message;
+	}
 	
 	public void setType(short type) throws Exception {
 		this.type = type;
@@ -149,6 +156,11 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState {
 				throw new Exception("Response type not set");
 			}
 			break;
+		case RpcProtocol.RES_REMOVE_DATANODE:
+			if (removeDataNodeRes == null){
+				throw new Exception("Response type not set");
+			}
+			break;
 		}		
 	}	
 
@@ -188,7 +200,10 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState {
 			break;			
 		case RpcProtocol.RES_PING_NAMENODE:
 			written += pingNameNodeRes.write(buffer);
-			break;			
+			break;		
+		case RpcProtocol.RES_REMOVE_DATANODE:
+			written += removeDataNodeRes.write(buffer);
+			break;		
 		}
 		
 		return written;
@@ -235,6 +250,10 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState {
 			pingNameNodeRes.update(buffer);
 			pingNameNodeRes.setError(error);
 			break;		
+		case RpcProtocol.RES_REMOVE_DATANODE:
+			removeDataNodeRes.update(buffer);
+			removeDataNodeRes.setError(error);
+			break;		
 		}
 	}
 	
@@ -285,4 +304,8 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState {
 	public RpcResponseMessage.PingNameNodeRes pingNameNode(){
 		return this.pingNameNodeRes;
 	}
+
+	public RpcResponseMessage.RemoveDataNodeRes removeDataNode(){
+		return this.removeDataNodeRes;
+	}
 }
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
index d22e09a..37e5503 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
@@ -45,6 +45,7 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag
 	private RpcRequestMessage.GetDataNodeReq getDataNodeReq;
 	private RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq;
 	private RpcRequestMessage.PingNameNodeReq pingNameNodeReq;
+	private RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq;
 
 	public TcpNameNodeRequest() {
 		this.cmd = 0;
@@ -60,7 +61,8 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag
 		this.dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq();
 		this.pingNameNodeReq = new RpcRequestMessage.PingNameNodeReq();
 		this.getDataNodeReq = new RpcRequestMessage.GetDataNodeReq();
-	}	
+		this.removeDataNodeReq = new RpcRequestMessage.RemoveDataNodeReq();
+	}
 	
 	public TcpNameNodeRequest(RpcRequestMessage.CreateFileReq message) {
 		this.type = message.getType();
@@ -115,7 +117,12 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag
 		this.type = message.getType();
 		this.pingNameNodeReq = message;
 	}
-	
+
+	public TcpNameNodeRequest(RpcRequestMessage.RemoveDataNodeReq message) {
+		this.type = message.getType();
+		this.removeDataNodeReq = message;
+	}
+
 	public void setCommand(short command) {
 		this.cmd = command;
 	}	
@@ -163,6 +170,9 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag
 		case RpcProtocol.REQ_PING_NAMENODE:
 			written += pingNameNodeReq.write(buffer);
 			break;
+		case RpcProtocol.REQ_REMOVE_DATANODE:
+			written += removeDataNodeReq.write(buffer);
+			break;
 		}
 		
 		return written;
@@ -206,6 +216,9 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag
 		case RpcProtocol.REQ_PING_NAMENODE:
 			pingNameNodeReq.update(buffer);
 			break;
+		case RpcProtocol.REQ_REMOVE_DATANODE:
+			removeDataNodeReq.update(buffer);
+			break;
 		}
 	}
 
@@ -260,4 +273,8 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag
 	public RpcRequestMessage.PingNameNodeReq pingNameNode(){
 		return this.pingNameNodeReq;
 	}
+
+	public RpcRequestMessage.RemoveDataNodeReq removeDataNode(){
+		return this.removeDataNodeReq;
+	}
 }
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
index 73912df..8a01a19 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
@@ -43,6 +43,7 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo
 	private RpcResponseMessage.GetLocationRes getLocationRes;	
 	private RpcResponseMessage.GetDataNodeRes getDataNodeRes;
 	private RpcResponseMessage.PingNameNodeRes pingNameNodeRes;
+	private RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes;
 	
 	public TcpNameNodeResponse() {
 		this.type = 0;
@@ -56,6 +57,7 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo
 		this.getLocationRes = new RpcResponseMessage.GetLocationRes();
 		this.getDataNodeRes = new RpcResponseMessage.GetDataNodeRes();
 		this.pingNameNodeRes = new RpcResponseMessage.PingNameNodeRes();
+		this.removeDataNodeRes = new RpcResponseMessage.RemoveDataNodeRes();
 	}
 	
 	public TcpNameNodeResponse(RpcResponseMessage.VoidRes message) {
@@ -102,6 +104,11 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo
 		this.type = message.getType();
 		this.pingNameNodeRes = message;
 	}
+
+	public TcpNameNodeResponse(RpcResponseMessage.RemoveDataNodeRes message) {
+		this.type = message.getType();
+		this.removeDataNodeRes = message;
+	}
 	
 	public void setType(short type) throws Exception {
 		this.type = type;
@@ -143,7 +150,10 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo
 			break;			
 		case RpcProtocol.RES_PING_NAMENODE:
 			written += pingNameNodeRes.write(buffer);
-			break;			
+			break;
+		case RpcProtocol.RES_REMOVE_DATANODE:
+			written += removeDataNodeRes.write(buffer);
+			break;
 		}
 		
 		return written;
@@ -189,7 +199,11 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo
 		case RpcProtocol.RES_PING_NAMENODE:
 			pingNameNodeRes.update(buffer);
 			pingNameNodeRes.setError(error);
-			break;		
+			break;
+		case RpcProtocol.RES_REMOVE_DATANODE:
+			removeDataNodeRes.update(buffer);
+			removeDataNodeRes.setError(error);
+			break;
 		}
 	}
 	
@@ -240,4 +254,8 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo
 	public RpcResponseMessage.PingNameNodeRes pingNameNode(){
 		return this.pingNameNodeRes;
 	}
+
+	public RpcResponseMessage.RemoveDataNodeRes removeDataNode(){
+		return this.removeDataNodeRes;
+	}
 }
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java
index decdf1c..6f96e4f 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java
@@ -31,6 +31,7 @@ import org.apache.crail.utils.CrailUtils;
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.net.InetAddress;
 
 public class TcpRpcConnection implements RpcConnection {
 	static private final Logger LOG = CrailUtils.getLogger();
@@ -184,4 +185,15 @@ public class TcpRpcConnection implements RpcConnection {
 		return new TcpFuture<RpcPing>(future, resp);
 	}
 
+	public RpcFuture<RpcRemoveDataNode> removeDataNode(InetAddress addr, int port) throws Exception {
+		RpcRequestMessage.RemoveDataNodeReq req = new RpcRequestMessage.RemoveDataNodeReq(addr, port);
+		RpcResponseMessage.RemoveDataNodeRes resp = new RpcResponseMessage.RemoveDataNodeRes();
+
+		TcpNameNodeRequest request = new TcpNameNodeRequest(req);
+		TcpNameNodeResponse response = new TcpNameNodeResponse(resp);
+		request.setCommand(RpcProtocol.CMD_REMOVE_DATANODE);
+		NaRPCFuture<TcpNameNodeRequest, TcpNameNodeResponse> future = endpoint.issueRequest(request, response);
+		return new TcpFuture<RpcRemoveDataNode>(future, resp);
+	}
+
 }
\ No newline at end of file
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java b/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java
index d5a339d..10c32d3 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java
@@ -40,6 +40,7 @@ public class RpcProtocol extends RpcErrors {
 	public static final short CMD_DUMP_NAMENODE = 10;
 	public static final short CMD_PING_NAMENODE = 11;
 	public static final short CMD_GET_DATANODE = 12;
+	public static final short CMD_REMOVE_DATANODE = 13;
 	
 	//request types
 	public static final short REQ_CREATE_FILE = 1;	
@@ -53,7 +54,8 @@ public class RpcProtocol extends RpcErrors {
 	public static final short REQ_DUMP_NAMENODE = 10;
 	public static final short REQ_PING_NAMENODE = 11;
 	public static final short REQ_GET_DATANODE = 12;
-	
+	public static final short REQ_REMOVE_DATANODE = 13;
+
 	//response types
 	public static final short RES_VOID = 1;
 	public static final short RES_CREATE_FILE = 2;
@@ -64,6 +66,7 @@ public class RpcProtocol extends RpcErrors {
 	public static final short RES_GET_LOCATION = 7;
 	public static final short RES_PING_NAMENODE = 9;
 	public static final short RES_GET_DATANODE = 10;
+	public static final short RES_REMOVE_DATANODE = 11;
 	
 	
 	static {
@@ -79,6 +82,7 @@ public class RpcProtocol extends RpcErrors {
 		requestTypes[CMD_DUMP_NAMENODE] = REQ_DUMP_NAMENODE;
 		requestTypes[CMD_PING_NAMENODE] = REQ_PING_NAMENODE;	
 		requestTypes[CMD_GET_DATANODE] = REQ_GET_DATANODE;
+		requestTypes[CMD_REMOVE_DATANODE] = REQ_REMOVE_DATANODE;
 		
 		responseTypes[0] = 0;
 		responseTypes[CMD_CREATE_FILE] = RES_CREATE_FILE;
@@ -92,6 +96,7 @@ public class RpcProtocol extends RpcErrors {
 		responseTypes[CMD_DUMP_NAMENODE] = RES_VOID;
 		responseTypes[CMD_PING_NAMENODE] = RES_PING_NAMENODE;	
 		responseTypes[CMD_GET_DATANODE] = RES_GET_DATANODE;
+		responseTypes[CMD_REMOVE_DATANODE] = RES_REMOVE_DATANODE;
 	}
 	
 
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java b/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java
index be043a2..7d1f414 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java
@@ -18,6 +18,8 @@
 
 package org.apache.crail.rpc;
 
+import java.io.IOException;
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
@@ -571,7 +573,63 @@ public class RpcRequestMessage {
 		public void update(ByteBuffer buffer) {
 			op = buffer.getInt();
 		}		
-	}	
+	}
+
+	public static class RemoveDataNodeReq implements RpcProtocol.NameNodeRpcMessage {
+		private InetAddress ipAddr;
+		private int port;
+
+		public RemoveDataNodeReq(){
+			this.ipAddr = null;
+			this.port = 0;
+		}
+
+		public RemoveDataNodeReq(InetAddress addr, int port){
+			this.ipAddr = addr;
+			this.port = port;
+		}
+
+		public int size() {
+			//  sizeof(ip-addr) + sizeof(port)
+			return 4 + Integer.BYTES;
+		}
+
+		public short getType(){
+			return RpcProtocol.REQ_REMOVE_DATANODE;
+		}
+
+		public int write(ByteBuffer buffer) throws IOException {
+			int size = size();
+
+			checkSize(buffer.remaining());
+
+			buffer.put(this.getIPAddress().getAddress());
+			buffer.putInt(this.port());
+			return size;
+		}
+
+		private void checkSize(int remaining) throws IOException {
+			if(this.size() > remaining)
+				throw new IOException("Only " + remaining + " remaining bytes stored in buffer, however " + this.size() + " bytes are required");
+		}
+
+		public void update(ByteBuffer buffer) throws IOException {
+			checkSize(buffer.remaining());
+
+			byte[] b = new byte[4];
+			buffer.get(b);
+			this.ipAddr = InetAddress.getByAddress(b);
+			this.port = buffer.getInt();
+		}
+
+		public InetAddress getIPAddress(){
+			return this.ipAddr;
+		}
+
+		public int port(){
+			return this.port;
+		}
+	}
 	
 
 }
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java b/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java
index d4175af..32697ea 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java
@@ -598,6 +598,10 @@ public class RpcResponseMessage {
 		public void setFreeBlockCount(int blockCount) {
 			this.statistics.setFreeBlockCount(blockCount);
 		}
+
+		public void setStatus(short status) {
+			this.statistics.setStatus(status);
+		}
 		
 		public short getError(){
 			return 0;
@@ -652,4 +656,49 @@ public class RpcResponseMessage {
 			this.error = error;
 		}
 	}
+
+	public static class RemoveDataNodeRes implements RpcProtocol.NameNodeRpcMessage, RpcRemoveDataNode {
+		public static int CSIZE = Short.BYTES;
+
+		private short data;
+		private short error;
+
+		public RemoveDataNodeRes() {
+			this.data = 0;
+			this.error = 0;
+		}
+
+		public int size() {
+			return CSIZE;
+		}
+
+		public short getType(){
+			return RpcProtocol.RES_REMOVE_DATANODE;
+		}
+
+		public int write(ByteBuffer buffer) {
+			buffer.putShort(data);
+			return CSIZE;
+		}
+
+		public void update(ByteBuffer buffer) {
+			data = buffer.getShort();
+		}
+
+		public short getData(){
+			return data;
+		}
+
+		public void setData(short data) {
+			this.data = data;
+		}
+
+		public short getError(){
+			return error;
+		}
+
+		public void setError(short error) {
+			this.error = error;
+		}
+	}
 }