You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crail.apache.org by pe...@apache.org on 2018/02/23 13:52:10 UTC

incubator-crail git commit: Narpc: enable multiple core dispatcher

Repository: incubator-crail
Updated Branches:
  refs/heads/master 7ea8753a6 -> d58abd828


Narpc: enable multiple core dispatcher

Enable multi core dispatcher if configured to use NaRPC (RPC and
Storage).

Fixes CRAIL-9 JIRA ticket

Close #5

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/d58abd82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/d58abd82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/d58abd82

Branch: refs/heads/master
Commit: d58abd828245815f450d2342ff5df6505ea704e6
Parents: 7ea8753
Author: Patrick Stuedi <st...@zurich.ibm.com>
Authored: Fri Feb 23 13:59:02 2018 +0100
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Fri Feb 23 14:51:18 2018 +0100

----------------------------------------------------------------------
 pom.xml                                              |  2 +-
 .../crail/namenode/rpc/tcp/TcpNameNodeServer.java    |  2 +-
 .../crail/namenode/rpc/tcp/TcpRpcConstants.java      |  7 +++++++
 .../crail/storage/tcp/TcpStorageConstants.java       | 15 +++++++++++++++
 .../apache/crail/storage/tcp/TcpStorageServer.java   |  2 +-
 5 files changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index eb64c90..720f078 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
       <dependency>
         <groupId>com.ibm.narpc</groupId>
         <artifactId>narpc</artifactId>
-        <version>1.0</version>
+        <version>1.1</version>
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java
----------------------------------------------------------------------
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java
index 7c2f78d..60b1833 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java
@@ -49,7 +49,7 @@ public class TcpNameNodeServer extends RpcServer {
 		TcpRpcConstants.verify();
 		this.serverGroup = new NaRPCServerGroup<TcpNameNodeRequest, TcpNameNodeResponse>(
 				dispatcher, TcpRpcConstants.NAMENODE_TCP_QUEUEDEPTH,
-				TcpRpcConstants.NAMENODE_TCP_MESSAGESIZE, true);
+				TcpRpcConstants.NAMENODE_TCP_MESSAGESIZE, true, TcpRpcConstants.NAMENODE_TCP_CORES);
 		this.serverEndpoint = serverGroup.createServerEndpoint();
 		InetSocketAddress inetSocketAddress = CrailUtils.getNameNodeAddress();
 		serverEndpoint.bind(inetSocketAddress);

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java
----------------------------------------------------------------------
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java
index 407d366..05a0b10 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java
@@ -34,6 +34,9 @@ public class TcpRpcConstants {
 	public static final String NAMENODE_TCP_MESSAGESIZE_KEY = "crail.namenode.tcp.messageSize";
 	public static int NAMENODE_TCP_MESSAGESIZE = 512;	
 	
+	public static final String NAMENODE_TCP_CORES_KEY = "crail.namenode.tcp.cores";
+	public static int NAMENODE_TCP_CORES = 1;	
+	
 	public static void updateConstants(CrailConfiguration conf){
 		if (conf.get(NAMENODE_TCP_QUEUEDEPTH_KEY) != null) {
 			NAMENODE_TCP_QUEUEDEPTH = Integer.parseInt(conf.get(NAMENODE_TCP_QUEUEDEPTH_KEY));
@@ -41,6 +44,9 @@ public class TcpRpcConstants {
 		if (conf.get(NAMENODE_TCP_MESSAGESIZE_KEY) != null) {
 			NAMENODE_TCP_MESSAGESIZE = Integer.parseInt(conf.get(NAMENODE_TCP_MESSAGESIZE_KEY));
 		}		
+		if (conf.get(NAMENODE_TCP_CORES_KEY) != null) {
+			NAMENODE_TCP_CORES = Integer.parseInt(conf.get(NAMENODE_TCP_CORES_KEY));
+		}		
 	}
 	
 	public static void verify() throws IOException {
@@ -49,5 +55,6 @@ public class TcpRpcConstants {
 	public static void printConf(Logger logger) {
 		LOG.info(NAMENODE_TCP_QUEUEDEPTH_KEY + " " + NAMENODE_TCP_QUEUEDEPTH);
 		LOG.info(NAMENODE_TCP_MESSAGESIZE_KEY + " " + NAMENODE_TCP_MESSAGESIZE);
+		LOG.info(NAMENODE_TCP_CORES_KEY + " " + NAMENODE_TCP_CORES);
 	}	
 }

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java
----------------------------------------------------------------------
diff --git a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java
index 55c882e..885f8c1 100644
--- a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java
+++ b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java
@@ -53,11 +53,16 @@ public class TcpStorageConstants {
 	public static final String STORAGE_TCP_QUEUE_DEPTH_KEY = "crail.storage.tcp.queuedepth";
 	public static int STORAGE_TCP_QUEUE_DEPTH = 16;	
 	
+	public static final String STORAGE_TCP_CORES_KEY = "crail.storage.tcp.cores";
+	public static int STORAGE_TCP_CORES = 1;		
+	
     public static void init(CrailConfiguration conf, String[] args) throws Exception {
         if (args != null) {
                 Option portOption = Option.builder("p").desc("port to start server on").hasArg().build();
+                Option coresOption = Option.builder("c").desc("number of cores to use").hasArg().build();
                 Options options = new Options();
                 options.addOption(portOption);
+                options.addOption(coresOption);
                 CommandLineParser parser = new DefaultParser();
 
                 try {
@@ -67,6 +72,12 @@ public class TcpStorageConstants {
                                 LOG.info("using custom port " + port);
                                 conf.set(TcpStorageConstants.STORAGE_TCP_PORT_KEY, port);
                         }
+                        if (line.hasOption(coresOption.getOpt())) {
+                            String cores = line.getOptionValue(coresOption.getOpt());
+                            LOG.info("number of cores used is " + cores);
+                            conf.set(TcpStorageConstants.STORAGE_TCP_CORES_KEY, cores);
+                        }                        
+                        
                 } catch (ParseException e) {
                         HelpFormatter formatter = new HelpFormatter();
                         formatter.printHelp("RDMA storage tier", options);
@@ -95,6 +106,9 @@ public class TcpStorageConstants {
 		}	
 		if (conf.get(STORAGE_TCP_QUEUE_DEPTH_KEY) != null) {
 			STORAGE_TCP_QUEUE_DEPTH = Integer.parseInt(conf.get(STORAGE_TCP_QUEUE_DEPTH_KEY));
+		}
+		if (conf.get(STORAGE_TCP_CORES_KEY) != null) {
+			STORAGE_TCP_CORES = Integer.parseInt(conf.get(STORAGE_TCP_CORES_KEY));
 		}		
 	}	
 	
@@ -105,6 +119,7 @@ public class TcpStorageConstants {
 		logger.info(STORAGE_TCP_ALLOCATION_SIZE_KEY + " " + STORAGE_TCP_ALLOCATION_SIZE);
 		logger.info(STORAGE_TCP_DATA_PATH_KEY + " " + STORAGE_TCP_DATA_PATH);
 		logger.info(STORAGE_TCP_QUEUE_DEPTH_KEY + " " + STORAGE_TCP_QUEUE_DEPTH);
+		logger.info(STORAGE_TCP_CORES_KEY + " " + STORAGE_TCP_CORES);
 	}	
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
----------------------------------------------------------------------
diff --git a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
index 9ed2260..f415f07 100644
--- a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
+++ b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
@@ -61,7 +61,7 @@ public class TcpStorageServer implements Runnable, StorageServer, NaRPCService<T
 	public void init(CrailConfiguration conf, String[] args) throws Exception {
 		TcpStorageConstants.init(conf, args);
 		
-		this.serverGroup = new NaRPCServerGroup<TcpStorageRequest, TcpStorageResponse>(this, TcpStorageConstants.STORAGE_TCP_QUEUE_DEPTH, (int) CrailConstants.BLOCK_SIZE*2, false);
+		this.serverGroup = new NaRPCServerGroup<TcpStorageRequest, TcpStorageResponse>(this, TcpStorageConstants.STORAGE_TCP_QUEUE_DEPTH, (int) CrailConstants.BLOCK_SIZE*2, false, TcpStorageConstants.STORAGE_TCP_CORES);
 		this.serverEndpoint = serverGroup.createServerEndpoint();
 		this.address = getDataNodeAddress();
 		serverEndpoint.bind(address);