You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crail.apache.org by pe...@apache.org on 2018/04/10 11:19:40 UTC

[01/16] incubator-crail git commit: New NVMf storage tier: use jNVMf library

Repository: incubator-crail
Updated Branches:
  refs/heads/master c053477a7 -> dc50fc193


New NVMf storage tier: use jNVMf library

New NVMf storage tier implementation which uses jNVMf library instead
of SPDK. We do not implement unaligned reads anymore since we believe
the semantics of the underlying storage system should not be hidden like
this. We guarantee good performance when using buffered streams since
they now try to align accesses whenever possible.

https://issues.apache.org/jira/browse/CRAIL-22

Close #16

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/29be91d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/29be91d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/29be91d2

Branch: refs/heads/master
Commit: 29be91d2f00e779fb5dc37575db9e5e763ea6db0
Parents: c053477
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Thu Mar 1 17:29:34 2018 +0100
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:18:19 2018 +0200

----------------------------------------------------------------------
 storage-nvmf/pom.xml                            |   5 +-
 .../crail/storage/nvmf/NvmfBufferCache.java     |  69 ----
 .../crail/storage/nvmf/NvmfStorageClient.java   |  43 ++-
 .../storage/nvmf/NvmfStorageConstants.java      |  73 ++---
 .../crail/storage/nvmf/NvmfStorageServer.java   |  66 ++--
 .../crail/storage/nvmf/NvmfStorageTier.java     |   6 +-
 .../crail/storage/nvmf/client/NvmfFuture.java   | 149 +++++++++
 .../nvmf/client/NvmfRegisteredBufferCache.java  | 105 +++++++
 .../nvmf/client/NvmfStagingBufferCache.java     | 159 ++++++++++
 .../nvmf/client/NvmfStorageEndpoint.java        | 315 ++++++++++---------
 .../storage/nvmf/client/NvmfStorageFuture.java  | 116 -------
 .../nvmf/client/NvmfStorageUnalignedFuture.java | 110 -------
 .../client/NvmfStorageUnalignedRMWFuture.java   |  76 -----
 .../client/NvmfStorageUnalignedReadFuture.java  |  60 ----
 .../client/NvmfStorageUnalignedWriteFuture.java |  50 ---
 .../storage/nvmf/client/NvmfStorageUtils.java   |  48 ---
 .../nvmf/client/NvmfUnalignedWriteFuture.java   | 183 +++++++++++
 .../nvmf/client/NvmfStagingBufferCacheTest.java |  73 +++++
 18 files changed, 941 insertions(+), 765 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/pom.xml
----------------------------------------------------------------------
diff --git a/storage-nvmf/pom.xml b/storage-nvmf/pom.xml
index 6500632..20ad294 100644
--- a/storage-nvmf/pom.xml
+++ b/storage-nvmf/pom.xml
@@ -22,8 +22,9 @@
       <artifactId>junit</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.ibm.disni</groupId>
-      <artifactId>disni</artifactId>
+      <groupId>com.ibm.jnvmf</groupId>
+      <artifactId>jnvmf</artifactId>
+      <version>1.0</version>
     </dependency>
     <dependency>
       <groupId>log4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfBufferCache.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfBufferCache.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfBufferCache.java
deleted file mode 100644
index 19142ec..0000000
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfBufferCache.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf;
-
-import com.ibm.disni.nvmef.NvmeEndpointGroup;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.crail.CrailBuffer;
-import org.apache.crail.conf.CrailConstants;
-import org.apache.crail.memory.BufferCache;
-import org.apache.crail.memory.OffHeapBuffer;
-
-
-public class NvmfBufferCache extends BufferCache {
-	private static final int ALIGNMENT = 4096;
-	private NvmeEndpointGroup endpointGroup;
-
-	private List<ByteBuffer> bufferPool = new ArrayList<>();
-
-	public NvmfBufferCache() throws IOException {
-		super();
-		endpointGroup = NvmfStorageTier.getEndpointGroup();
-		if (endpointGroup == null) {
-			throw new IOException("NvmfStorageTier not initialized");
-		}
-	}
-
-	@Override
-	public String providerName() {
-		return "NvmfBufferCache";
-	}
-
-	@Override
-	public CrailBuffer allocateRegion() throws IOException {
-		ByteBuffer buffer = endpointGroup.allocateBuffer(CrailConstants.BUFFER_SIZE, ALIGNMENT);
-		bufferPool.add(buffer);
-		return OffHeapBuffer.wrap(buffer);
-	}
-
-	@Override
-	public void close() {
-		super.close();
-		for (ByteBuffer buffer : bufferPool) {
-			endpointGroup.freeBuffer(buffer);
-		}
-		bufferPool.clear();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
index d270f5d..d9dd976 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015-2018, IBM Corporation
+ * Copyright (C) 2018, IBM Corporation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,8 +19,7 @@
 
 package org.apache.crail.storage.nvmf;
 
-import java.io.IOException;
-
+import com.ibm.jnvmf.Nvme;
 import org.apache.crail.CrailBufferCache;
 import org.apache.crail.CrailStatistics;
 import org.apache.crail.conf.CrailConfiguration;
@@ -31,13 +30,23 @@ import org.apache.crail.storage.nvmf.client.NvmfStorageEndpoint;
 import org.apache.crail.utils.CrailUtils;
 import org.slf4j.Logger;
 
-import com.ibm.disni.nvmef.NvmeEndpointGroup;
-import com.ibm.disni.nvmef.spdk.NvmeTransportType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public class NvmfStorageClient implements StorageClient {
 	private static final Logger LOG = CrailUtils.getLogger();
-	private static NvmeEndpointGroup clientGroup;
-	private boolean initialized = false;
+	private static Nvme nvme;
+	private boolean initialized;
+	private List<StorageEndpoint> endpoints;
+	private CrailStatistics statistics;
+	private CrailBufferCache bufferCache;
+
+	public NvmfStorageClient() {
+		this.initialized = false;
+		this.endpoints = new CopyOnWriteArrayList<>();
+	}
 
 	public void init(CrailStatistics statistics, CrailBufferCache bufferCache, CrailConfiguration crailConfiguration,
 					 String[] args) throws IOException {
@@ -45,7 +54,9 @@ public class NvmfStorageClient implements StorageClient {
 			throw new IOException("NvmfStorageTier already initialized");
 		}
 		initialized = true;
-
+		this.statistics = statistics;
+		this.bufferCache = bufferCache;
+		LOG.info("Initialize Nvmf storage client");
 		NvmfStorageConstants.parseCmdLine(crailConfiguration, args);
 	}
 
@@ -53,19 +64,23 @@ public class NvmfStorageClient implements StorageClient {
 		NvmfStorageConstants.printConf(logger);
 	}
 
-	public static NvmeEndpointGroup getEndpointGroup() {
-		if (clientGroup == null) {
-			clientGroup = new NvmeEndpointGroup(new NvmeTransportType[]{NvmeTransportType.RDMA},
-					NvmfStorageConstants.CLIENT_MEMPOOL);
+	public static Nvme getEndpointGroup() throws UnknownHostException {
+		if (nvme == null) {
+			nvme = new Nvme();
 		}
-		return clientGroup;
+		return nvme;
 	}
 
 	public synchronized StorageEndpoint createEndpoint(DataNodeInfo info) throws IOException {
-		return new NvmfStorageEndpoint(getEndpointGroup(), CrailUtils.datanodeInfo2SocketAddr(info));
+		StorageEndpoint endpoint = new NvmfStorageEndpoint(getEndpointGroup(), info, statistics, bufferCache);
+		endpoints.add(endpoint);
+		return endpoint;
 	}
 
 	public void close() throws Exception {
+		for (StorageEndpoint endpoint : endpoints) {
+			endpoint.close();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
index d65e398..8ce132b 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015-2018, IBM Corporation
+ * Copyright (C) 2018, IBM Corporation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,13 +19,9 @@
 
 package org.apache.crail.storage.nvmf;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
+import com.ibm.jnvmf.NamespaceIdentifier;
+import com.ibm.jnvmf.NvmeQualifiedName;
+import org.apache.commons.cli.*;
 import org.apache.crail.conf.CrailConfiguration;
 import org.apache.crail.conf.CrailConstants;
 import org.slf4j.Logger;
@@ -33,7 +29,6 @@ import org.slf4j.Logger;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.concurrent.TimeUnit;
 
 public class NvmfStorageConstants {
 
@@ -46,22 +41,19 @@ public class NvmfStorageConstants {
 	public static int PORT = 50025;
 
 	public static final String NQN_KEY = "nqn";
-	public static String NQN = "nqn.2016-06.io.spdk:cnode1";
+	public static NvmeQualifiedName NQN = new NvmeQualifiedName("nqn.2017-06.io.crail:cnode");
 
-	public static final String NAMESPACE_KEY = "namespace";
-	public static int NAMESPACE = 1;
+	/* this is a server property, the client will get the nsid from the namenode */
+	public static NamespaceIdentifier NAMESPACE = new NamespaceIdentifier(1);
 
 	public static final String ALLOCATION_SIZE_KEY = "allocationsize";
 	public static long ALLOCATION_SIZE = 1073741824; /* 1GB */
 
-	public static final String SERVER_MEMPOOL_KEY = "servermempool";
-	public static long SERVER_MEMPOOL = 256;
+	public static final String QUEUE_SIZE_KEY = "queueSize";
+	public static int QUEUE_SIZE = 64;
 
-	public static final String CLIENT_MEMPOOL_KEY = "clientmempool";
-	public static long CLIENT_MEMPOOL = 256;
-
-	public static final TimeUnit TIME_UNIT = TimeUnit.MINUTES;
-	public static final long TIME_OUT = 15;
+	public static final String STAGING_CACHE_SIZE_KEY = "stagingcachesize";
+	public static int STAGING_CACHE_SIZE = 262144;
 
 	private static String fullKey(String key) {
 		return PREFIX + "." + key;
@@ -72,12 +64,7 @@ public class NvmfStorageConstants {
 	}
 
 	public static void updateConstants(CrailConfiguration conf) throws UnknownHostException {
-		String arg = get(conf, NAMESPACE_KEY);
-		if (arg != null) {
-			NAMESPACE = Integer.parseInt(arg);
-		}
-
-		arg = get(conf, IP_ADDR_KEY);
+		String arg = get(conf, IP_ADDR_KEY);
 		if (arg != null) {
 			IP_ADDR = InetAddress.getByName(arg);
 		}
@@ -89,7 +76,7 @@ public class NvmfStorageConstants {
 
 		arg = get(conf, NQN_KEY);
 		if (arg != null) {
-			NQN = arg;
+			NQN = new NvmeQualifiedName(arg);
 		}
 
 		arg = get(conf, ALLOCATION_SIZE_KEY);
@@ -97,23 +84,23 @@ public class NvmfStorageConstants {
 			ALLOCATION_SIZE = Long.parseLong(arg);
 		}
 
-		arg = get(conf, SERVER_MEMPOOL_KEY);
+		arg = get(conf, QUEUE_SIZE_KEY);
 		if (arg != null) {
-			SERVER_MEMPOOL = Long.parseLong(arg);
+			QUEUE_SIZE = Integer.parseInt(arg);
 		}
 
-		arg = get(conf, CLIENT_MEMPOOL_KEY);
+		arg = get(conf, STAGING_CACHE_SIZE_KEY);
 		if (arg != null) {
-			CLIENT_MEMPOOL = Long.parseLong(arg);
+			STAGING_CACHE_SIZE = Integer.parseInt(arg);
 		}
 	}
 
-	public static void verify() throws IOException {
-		if (NAMESPACE <= 0){
-			throw new IOException("Namespace must be > 0");
-		}
+	public static void verify() {
 		if (ALLOCATION_SIZE % CrailConstants.BLOCK_SIZE != 0){
-			throw new IOException("allocationsize must be multiple of crail.blocksize");
+			throw new IllegalArgumentException("allocationsize must be multiple of crail.blocksize");
+		}
+		if (QUEUE_SIZE < 0) {
+			throw new IllegalArgumentException("Queue size negative");
 		}
 	}
 
@@ -123,12 +110,10 @@ public class NvmfStorageConstants {
 		}
 		logger.info(fullKey(PORT_KEY) + " " + PORT);
 		logger.info(fullKey(NQN_KEY) + " " + NQN);
-		logger.info(fullKey(NAMESPACE_KEY) + " " + NAMESPACE);
 		logger.info(fullKey(ALLOCATION_SIZE_KEY) + " " + ALLOCATION_SIZE);
-		logger.info(fullKey(SERVER_MEMPOOL_KEY) + " " + SERVER_MEMPOOL);
-		logger.info(fullKey(CLIENT_MEMPOOL_KEY) + " " + CLIENT_MEMPOOL);
+		logger.info(fullKey(QUEUE_SIZE_KEY) + " " + QUEUE_SIZE);
 	}
-	
+
 	public static void parseCmdLine(CrailConfiguration crailConfiguration, String[] args) throws IOException {
 		NvmfStorageConstants.updateConstants(crailConfiguration);
 
@@ -140,10 +125,12 @@ public class NvmfStorageConstants {
 				bindIp.setRequired(true);
 			}
 			Option port = Option.builder("p").desc("target port").hasArg().type(Number.class).build();
+			Option namespace = Option.builder("n").desc("namespace id").hasArg().type(Number.class).build();
 			Option nqn = Option.builder("nqn").desc("target subsystem NQN").hasArg().build();
 			options.addOption(bindIp);
 			options.addOption(port);
 			options.addOption(nqn);
+			options.addOption(namespace);
 			CommandLineParser parser = new DefaultParser();
 			HelpFormatter formatter = new HelpFormatter();
 			CommandLine line = null;
@@ -152,6 +139,10 @@ public class NvmfStorageConstants {
 				if (line.hasOption(port.getOpt())) {
 					NvmfStorageConstants.PORT = ((Number) line.getParsedOptionValue(port.getOpt())).intValue();
 				}
+				if (line.hasOption(namespace.getOpt())) {
+					NvmfStorageConstants.NAMESPACE = new
+							NamespaceIdentifier(((Number) line.getParsedOptionValue(namespace.getOpt())).intValue());
+				}
 			} catch (ParseException e) {
 				System.err.println(e.getMessage());
 				formatter.printHelp("NVMe storage tier", options);
@@ -161,10 +152,10 @@ public class NvmfStorageConstants {
 				NvmfStorageConstants.IP_ADDR = InetAddress.getByName(line.getOptionValue(bindIp.getOpt()));
 			}
 			if (line.hasOption(nqn.getOpt())) {
-				NvmfStorageConstants.NQN = line.getOptionValue(nqn.getOpt());
+				NvmfStorageConstants.NQN = new NvmeQualifiedName(line.getOptionValue(nqn.getOpt()));
 			}
 		}
 
 		NvmfStorageConstants.verify();
-	}	
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
index 3fd958e..f8b0d8c 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015-2018, IBM Corporation
+ * Copyright (C) 2018, IBM Corporation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,10 +19,7 @@
 
 package org.apache.crail.storage.nvmf;
 
-import com.ibm.disni.nvmef.NvmeEndpoint;
-import com.ibm.disni.nvmef.NvmeEndpointGroup;
-import com.ibm.disni.nvmef.spdk.NvmeTransportType;
-
+import com.ibm.jnvmf.*;
 import org.apache.crail.conf.CrailConfiguration;
 import org.apache.crail.storage.StorageResource;
 import org.apache.crail.storage.StorageServer;
@@ -31,19 +28,19 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URI;
+import java.util.List;
 
 public class NvmfStorageServer implements StorageServer {
 	private static final Logger LOG = CrailUtils.getLogger();
 
 	private boolean isAlive;
 	private long alignedSize;
-	private long offset;
+	private long address;
 	private boolean initialized = false;
-	private NvmeEndpoint endpoint;
+	private Controller controller;
 
 	public NvmfStorageServer() {}
-	
+
 	public void init(CrailConfiguration crailConfiguration, String[] args) throws Exception {
 		if (initialized) {
 			throw new IOException("NvmfStorageTier already initialized");
@@ -51,23 +48,40 @@ public class NvmfStorageServer implements StorageServer {
 		initialized = true;
 		NvmfStorageConstants.parseCmdLine(crailConfiguration, args);
 
-		NvmeEndpointGroup group = new NvmeEndpointGroup(new NvmeTransportType[]{NvmeTransportType.RDMA}, NvmfStorageConstants.SERVER_MEMPOOL);
-		endpoint = group.createEndpoint();
-
-		URI uri = new URI("nvmef://" + NvmfStorageConstants.IP_ADDR.getHostAddress() + ":" + NvmfStorageConstants.PORT +
-					"/0/" + NvmfStorageConstants.NAMESPACE + "?subsystem=" + NvmfStorageConstants.NQN);
-		endpoint.connect(uri);
-
-		long namespaceSize = endpoint.getNamespaceSize();
+		Nvme nvme = new Nvme();
+		NvmfTransportId transportId = new NvmfTransportId(
+				new InetSocketAddress(NvmfStorageConstants.IP_ADDR, NvmfStorageConstants.PORT),
+				NvmfStorageConstants.NQN);
+		controller = nvme.connect(transportId);
+		controller.getControllerConfiguration().setEnable(true);
+		controller.syncConfiguration();
+		controller.waitUntilReady();
+
+		List<Namespace> namespaces = controller.getActiveNamespaces();
+		Namespace namespace = null;
+		for (Namespace n : namespaces) {
+			if (n.getIdentifier().equals(NvmfStorageConstants.NAMESPACE)) {
+				namespace = n;
+				break;
+			}
+		}
+		if (namespace == null) {
+			throw new IllegalArgumentException("No namespace with id " + NvmfStorageConstants.NAMESPACE +
+					" at controller " + transportId.toString());
+		}
+		IdentifyNamespaceData namespaceData = namespace.getIdentifyNamespaceData();
+		LBAFormat lbaFormat = namespaceData.getFormattedLBASize();
+		int dataSize = lbaFormat.getLBADataSize().toInt();
+		long namespaceSize = dataSize * namespaceData.getNamespaceCapacity();
 		alignedSize = namespaceSize - (namespaceSize % NvmfStorageConstants.ALLOCATION_SIZE);
-		offset = 0;
+		address = 0;
 
 		isAlive = true;
-	}	
+	}
 
 	@Override
 	public void printConf(Logger log) {
-		NvmfStorageConstants.printConf(log);		
+		NvmfStorageConstants.printConf(log);
 	}
 
 	public void run() {
@@ -75,7 +89,7 @@ public class NvmfStorageServer implements StorageServer {
 		while (isAlive) {
 			try {
 				Thread.sleep(1000 /* ms */);
-				endpoint.keepAlive();
+				controller.keepAlive();
 			} catch (Exception e) {
 				e.printStackTrace();
 				isAlive = false;
@@ -86,15 +100,15 @@ public class NvmfStorageServer implements StorageServer {
 	@Override
 	public StorageResource allocateResource() throws Exception {
 		StorageResource resource = null;
-		
+
 		if (alignedSize > 0){
 			LOG.info("new block, length " + NvmfStorageConstants.ALLOCATION_SIZE);
-			LOG.debug("block stag 0, offset " + offset + ", length " + NvmfStorageConstants.ALLOCATION_SIZE);
+			LOG.debug("block stag 0, address " + address + ", length " + NvmfStorageConstants.ALLOCATION_SIZE);
 			alignedSize -= NvmfStorageConstants.ALLOCATION_SIZE;
-			resource = StorageResource.createResource(offset, (int)NvmfStorageConstants.ALLOCATION_SIZE, 0);
-			offset += NvmfStorageConstants.ALLOCATION_SIZE;
+			resource = StorageResource.createResource(address, (int)NvmfStorageConstants.ALLOCATION_SIZE, 0);
+			address += NvmfStorageConstants.ALLOCATION_SIZE;
 		}
-		
+
 		return resource;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java
index 1e37b44..749a331 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015-2018, IBM Corporation
+ * Copyright (C) 2018, IBM Corporation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -26,9 +26,9 @@ import org.slf4j.Logger;
 
 public class NvmfStorageTier extends NvmfStorageClient implements StorageTier {
 	private static final Logger LOG = CrailUtils.getLogger();
-	
+
 	public StorageServer launchServer() throws Exception {
-		LOG.info("initalizing NVMf datanode");
+		LOG.info("initalizing NVMf storage tier");
 		NvmfStorageServer storageServer = new NvmfStorageServer();
 		return storageServer;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
new file mode 100644
index 0000000..0a6c9b4
--- /dev/null
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright (C) 2018, IBM Corporation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crail.storage.nvmf.client;
+
+import com.ibm.jnvmf.*;
+import org.apache.crail.storage.StorageFuture;
+import org.apache.crail.storage.StorageResult;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class NvmfFuture<Command extends NvmIOCommand<? extends NvmIOCommandCapsule>> implements StorageFuture, OperationCallback {
+	private final NvmfStorageEndpoint endpoint;
+	private final Command command;
+	private final Queue<Command> operations;
+	private boolean done;
+	private RdmaException exception;
+	private final StorageResult storageResult;
+	private final Response<NvmResponseCapsule> response;
+	private int completed;
+
+	NvmfFuture(NvmfStorageEndpoint endpoint, Command command, Response<NvmResponseCapsule> response,
+			   Queue<Command> operations, int length) {
+		this.endpoint = endpoint;
+		this.command = command;
+		this.operations = operations;
+		this.done = false;
+		this.storageResult = () -> length;
+		this.response = response;
+		this.completed = 0;
+	}
+
+	@Override
+	public boolean isSynchronous() {
+		return false;
+	}
+
+	@Override
+	public boolean cancel(boolean b) {
+		return false;
+	}
+
+	@Override
+	public boolean isCancelled() {
+		return false;
+	}
+
+	@Override
+	public boolean isDone() {
+		if (!done) {
+			try {
+				endpoint.poll();
+			} catch (IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+		return done;
+	}
+
+	private final void checkStatus() throws ExecutionException {
+		if (exception != null) {
+			throw new ExecutionException(exception);
+		}
+		NvmCompletionQueueEntry cqe = response.getResponseCapsule().getCompletionQueueEntry();
+		StatusCode.Value statusCode = cqe.getStatusCode();
+		if (statusCode != null) {
+			if (!statusCode.equals(GenericStatusCode.getInstance().SUCCESS)) {
+				throw new ExecutionException(new UnsuccessfulComandException(cqe));
+			}
+		}
+	}
+
+	@Override
+	public StorageResult get() throws InterruptedException, ExecutionException {
+		try {
+			return get(2, TimeUnit.MINUTES);
+		} catch (TimeoutException e) {
+			throw new ExecutionException(e);
+		}
+	}
+
+	@Override
+	public StorageResult get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
+		if (!done) {
+			long start = System.nanoTime();
+			long end = start + TimeUnit.NANOSECONDS.convert(timeout, timeUnit);
+			boolean waitTimeOut;
+			do {
+				try {
+					endpoint.poll();
+				} catch (IOException e) {
+					throw new ExecutionException(e);
+				}
+				waitTimeOut = System.nanoTime() > end;
+			} while (!done && !waitTimeOut);
+			if (!done && waitTimeOut) {
+				throw new TimeoutException("poll wait time out!");
+			}
+		}
+		checkStatus();
+		return storageResult;
+	}
+
+	@Override
+	public void onStart() {
+
+	}
+
+	@Override
+	public void onComplete() {
+		assert !done;
+		assert completed < 2;
+		if (++completed == 2) {
+			/* we need to complete command and response */
+			operations.add(command);
+			this.done = true;
+			endpoint.putOperation();
+		}
+	}
+
+	@Override
+	public void onFailure(RdmaException e) {
+		assert !done;
+		this.operations.add(command);
+		this.exception = e;
+		this.done = true;
+		endpoint.putOperation();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java
new file mode 100644
index 0000000..0a364e8
--- /dev/null
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (C) 2018, IBM Corporation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crail.storage.nvmf.client;
+
+import com.ibm.jnvmf.Freeable;
+import com.ibm.jnvmf.KeyedNativeBuffer;
+import com.ibm.jnvmf.NativeByteBuffer;
+import com.ibm.jnvmf.QueuePair;
+import org.apache.crail.CrailBuffer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+class NvmfRegisteredBufferCache implements Freeable {
+	private final QueuePair queuePair;
+	private final Map<CrailBuffer, KeyedNativeBuffer> bufferMap;
+	private final Map<Long, KeyedNativeBuffer> regionMap;
+	private boolean valid;
+
+	public NvmfRegisteredBufferCache(QueuePair queuePair) {
+		this.queuePair = queuePair;
+		this.bufferMap = new ConcurrentHashMap<>();
+		this.regionMap = new ConcurrentHashMap<>();
+		this.valid = true;
+	}
+
+	static class Buffer extends NativeByteBuffer implements KeyedNativeBuffer {
+		private final KeyedNativeBuffer registeredRegionBuffer;
+
+		Buffer(CrailBuffer buffer, KeyedNativeBuffer registeredRegionBuffer) {
+			super(buffer.getByteBuffer());
+			this.registeredRegionBuffer = registeredRegionBuffer;
+		}
+
+		@Override
+		public int getRemoteKey() {
+			return registeredRegionBuffer.getRemoteKey();
+		}
+
+		@Override
+		public int getLocalKey() {
+			return registeredRegionBuffer.getLocalKey();
+		}
+	}
+
+	KeyedNativeBuffer get(CrailBuffer buffer) throws IOException {
+		KeyedNativeBuffer keyedNativeBuffer = bufferMap.get(buffer);
+		if (keyedNativeBuffer == null) {
+			CrailBuffer regionBuffer = buffer.getRegion();
+			keyedNativeBuffer = regionMap.get(regionBuffer.address());
+			if (keyedNativeBuffer == null) {
+				/* region has not been registered yet */
+				keyedNativeBuffer = queuePair.registerMemory(regionBuffer.getByteBuffer());
+				KeyedNativeBuffer prevKeyedNativeBuffer =
+						regionMap.putIfAbsent(keyedNativeBuffer.getAddress(), keyedNativeBuffer);
+				if (prevKeyedNativeBuffer != null) {
+					/* someone registered the same region in parallel */
+					keyedNativeBuffer.free();
+					keyedNativeBuffer = prevKeyedNativeBuffer;
+				}
+			}
+			keyedNativeBuffer = new Buffer(buffer, keyedNativeBuffer);
+			KeyedNativeBuffer prevKeyedNativeBuffer =
+					bufferMap.putIfAbsent(buffer, keyedNativeBuffer);
+			if (prevKeyedNativeBuffer != null) {
+				/* someone added the same buffer parallel */
+				keyedNativeBuffer.free();
+				keyedNativeBuffer = prevKeyedNativeBuffer;
+			}
+		}
+		return keyedNativeBuffer;
+	}
+
+
+	@Override
+	public void free() throws IOException {
+		for (KeyedNativeBuffer buffer : bufferMap.values()) {
+			buffer.free();
+		}
+		valid = false;
+	}
+
+	@Override
+	public boolean isValid() {
+		return valid;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
new file mode 100644
index 0000000..b4f4dc3
--- /dev/null
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright (C) 2018, IBM Corporation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crail.storage.nvmf.client;
+
+import org.apache.crail.CrailBuffer;
+import org.apache.crail.CrailBufferCache;
+import org.apache.crail.storage.StorageFuture;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NvmfStagingBufferCache {
+	private final Map<Long, BufferCacheEntry> remoteAddressMap;
+	private final Queue<CrailBuffer> freeBuffers;
+	private int buffersLeft;
+	private final int lbaDataSize;
+	private final CrailBufferCache bufferCache;
+
+	private final CrailBufferCache getBufferCache() {
+		return bufferCache;
+	}
+
+	NvmfStagingBufferCache(CrailBufferCache bufferCache, int maxEntries, int lbaDataSize) {
+		if (maxEntries <= 0) {
+			throw new IllegalArgumentException("maximum entries <= 0");
+		}
+		if (lbaDataSize <= 0) {
+			throw new IllegalArgumentException("LBA data size <= 0");
+		}
+		this.remoteAddressMap = new ConcurrentHashMap<>(maxEntries);
+		this.freeBuffers = new ArrayBlockingQueue<>(maxEntries);
+		this.buffersLeft = maxEntries;
+		this.lbaDataSize = lbaDataSize;
+		this.bufferCache = bufferCache;
+	}
+
+	synchronized void allocateFreeBuffers() throws Exception {
+		if (!freeBuffers.isEmpty()) {
+			return;
+		}
+		if (buffersLeft == 0) {
+			/* TODO: make sure this happens rarely */
+			Iterator<BufferCacheEntry> iterator = remoteAddressMap.values().iterator();
+			while (iterator.hasNext()) {
+				BufferCacheEntry currentEntry = iterator.next();
+				if (currentEntry.tryFree()) {
+					iterator.remove();
+					freeBuffers.add(currentEntry.getBuffer());
+					return;
+				}
+			}
+			throw new OutOfMemoryError();
+		}
+
+		CrailBuffer buffer = getBufferCache().allocateBuffer();
+		if (buffer == null) {
+			throw new OutOfMemoryError();
+		}
+		if (buffer.capacity() < lbaDataSize) {
+			throw new IllegalArgumentException("Slice size smaller LBA data size");
+		}
+		while (buffer.remaining() >= lbaDataSize && buffersLeft > 0) {
+			buffer.limit(buffer.position() + lbaDataSize);
+			freeBuffers.add(buffer.slice());
+			buffer.position(buffer.limit());
+			buffersLeft--;
+		}
+	}
+
+	static class BufferCacheEntry {
+		private final CrailBuffer buffer;
+		private final AtomicInteger pending;
+		private StorageFuture future;
+
+		BufferCacheEntry(CrailBuffer buffer) {
+			this.buffer = buffer;
+			this.pending = new AtomicInteger(1);
+		}
+
+		public StorageFuture getFuture() {
+			return future;
+		}
+
+		public void setFuture(StorageFuture future) {
+			this.future = future;
+		}
+
+		void put() {
+			pending.decrementAndGet();
+		}
+
+		boolean tryGet() {
+			int prevPending;
+			do {
+				prevPending = pending.get();
+				if (prevPending < 0) {
+					return false;
+				}
+			} while (!pending.compareAndSet(prevPending, prevPending + 1));
+			return true;
+		}
+
+		boolean tryFree() {
+			return pending.compareAndSet(0, -1);
+		}
+
+		CrailBuffer getBuffer() {
+			return buffer;
+		}
+
+
+	}
+
+	BufferCacheEntry get(long alignedRemoteAddress) throws Exception {
+		CrailBuffer buffer;
+		do {
+			buffer = freeBuffers.poll();
+			if (buffer == null) {
+				allocateFreeBuffers();
+			}
+		} while (buffer == null);
+
+		BufferCacheEntry entry = new BufferCacheEntry(buffer);
+		BufferCacheEntry prevEntry = remoteAddressMap.putIfAbsent(alignedRemoteAddress, entry);
+		if (prevEntry != null) {
+			throw new IllegalStateException();
+		}
+		return entry;
+	}
+
+	BufferCacheEntry getExisting(long alignedRemoteAddress) {
+		BufferCacheEntry entry = remoteAddressMap.get(alignedRemoteAddress);
+		if (entry != null && !entry.tryGet()) {
+			entry = null;
+		}
+		return entry;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
index b4b1054..e1430af 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015-2018, IBM Corporation
+ * Copyright (C) 2018, IBM Corporation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,208 +19,223 @@
 
 package org.apache.crail.storage.nvmf.client;
 
-import com.ibm.disni.nvmef.NvmeCommand;
-import com.ibm.disni.nvmef.NvmeEndpoint;
-import com.ibm.disni.nvmef.NvmeEndpointGroup;
-import com.ibm.disni.nvmef.spdk.IOCompletion;
-
+import com.ibm.jnvmf.*;
 import org.apache.crail.CrailBuffer;
+import org.apache.crail.CrailBufferCache;
+import org.apache.crail.CrailStatistics;
 import org.apache.crail.conf.CrailConstants;
-import org.apache.crail.memory.BufferCache;
 import org.apache.crail.metadata.BlockInfo;
+import org.apache.crail.metadata.DataNodeInfo;
 import org.apache.crail.storage.StorageEndpoint;
 import org.apache.crail.storage.StorageFuture;
-import org.apache.crail.storage.nvmf.NvmfBufferCache;
 import org.apache.crail.storage.nvmf.NvmfStorageConstants;
 import org.apache.crail.utils.CrailUtils;
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.*;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class NvmfStorageEndpoint implements StorageEndpoint {
 	private static final Logger LOG = CrailUtils.getLogger();
 
-	private final InetSocketAddress inetSocketAddress;
-	private final NvmeEndpoint endpoint;
-	private final int sectorSize;
-	private final BufferCache cache;
-	private final BlockingQueue<NvmeCommand> freeCommands;
-	private final NvmeCommand[] commands;
-	private final NvmfStorageFuture[] futures;
-	private final ThreadLocal<long[]> completed;
-	private final int ioQeueueSize;
-
-	public NvmfStorageEndpoint(NvmeEndpointGroup group, InetSocketAddress inetSocketAddress) throws IOException {
-		this.inetSocketAddress = inetSocketAddress;
-		endpoint = group.createEndpoint();
+	private final Controller controller;
+	private final IOQueuePair queuePair;
+	private final int lbaDataSize;
+	private final long namespaceCapacity;
+	private final NvmfRegisteredBufferCache registeredBufferCache;
+	private final NvmfStagingBufferCache stagingBufferCache;
+	private final CrailStatistics statistics;
+
+	private final Queue<NvmWriteCommand> writeCommands;
+	private final Queue<NvmReadCommand> readCommands;
+
+	private final AtomicInteger outstandingOperations;
+
+	public NvmfStorageEndpoint(Nvme nvme, DataNodeInfo info, CrailStatistics statistics,
+							   CrailBufferCache bufferCache) throws IOException {
+		InetSocketAddress inetSocketAddress = new InetSocketAddress(
+				InetAddress.getByAddress(info.getIpAddress()), info.getPort());
+		// XXX FIXME: nsid from datanodeinfo
+		NvmfTransportId transportId = new NvmfTransportId(inetSocketAddress,
+				new NvmeQualifiedName(NvmfStorageConstants.NQN.toString() + info.getPort()));
+		LOG.info("Connecting to NVMf target at " + transportId.toString());
+		controller = nvme.connect(transportId);
+		controller.getControllerConfiguration().setEnable(true);
+		controller.syncConfiguration();
 		try {
-			URI url = new URI("nvmef://" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() +
-					"/0/" + NvmfStorageConstants.NAMESPACE + "?subsystem=nqn.2016-06.io.spdk:cnode1");
-			LOG.info("Connecting to " + url.toString());
-			endpoint.connect(url);
-		} catch (URISyntaxException e) {
-			//FIXME
-			e.printStackTrace();
+			controller.waitUntilReady();
+		} catch (TimeoutException e) {
+			throw new IOException(e);
 		}
-		sectorSize = endpoint.getSectorSize();
-		cache = new NvmfBufferCache();
-		ioQeueueSize = endpoint.getIOQueueSize();
-		freeCommands = new ArrayBlockingQueue<NvmeCommand>(ioQeueueSize);
-		commands = new NvmeCommand[ioQeueueSize];
-		for (int i = 0; i < ioQeueueSize; i++) {
-			NvmeCommand command = endpoint.newCommand();
-			command.setId(i);
-			commands[i] = command;
-			freeCommands.add(command);
+		IdentifyControllerData identifyControllerData = controller.getIdentifyControllerData();
+		if (CrailConstants.SLICE_SIZE > identifyControllerData.getMaximumDataTransferSize().toInt()) {
+			throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + " > max transfer size (" +
+					identifyControllerData.getMaximumDataTransferSize() + ")");
 		}
-		futures = new NvmfStorageFuture[ioQeueueSize];
-		completed = new ThreadLocal<long[]>() {
-			public long[] initialValue() {
-				return new long[ioQeueueSize];
+		List<Namespace> namespaces = controller.getActiveNamespaces();
+		//TODO: poll nsid in datanodeinfo
+		NamespaceIdentifier namespaceIdentifier = new NamespaceIdentifier(1);
+		Namespace namespace = null;
+		for (Namespace n : namespaces) {
+			if (n.getIdentifier().equals(namespaceIdentifier)) {
+				namespace = n;
+				break;
 			}
-		};
+		}
+		if (namespace == null) {
+			throw new IllegalArgumentException("No namespace with id " + namespaceIdentifier +
+					" at controller " + transportId.toString());
+		}
+		IdentifyNamespaceData identifyNamespaceData = namespace.getIdentifyNamespaceData();
+		lbaDataSize = identifyNamespaceData.getFormattedLBASize().getLBADataSize().toInt();
+		if (CrailConstants.SLICE_SIZE % lbaDataSize != 0) {
+			throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY +
+					" is not a multiple of LBA data size (" + lbaDataSize + ")");
+		}
+		namespaceCapacity = identifyNamespaceData.getNamespaceCapacity() * lbaDataSize;
+		this.queuePair = controller.createIOQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0,
+				SubmissionQueueEntry.SIZE);
+
+		this.writeCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
+		this.readCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
+		for(int i = 0; i < NvmfStorageConstants.QUEUE_SIZE; i++) {
+			NvmWriteCommand writeCommand = new NvmWriteCommand(queuePair);
+			writeCommand.setSendInline(true);
+			writeCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
+			writeCommands.add(writeCommand);
+			NvmReadCommand readCommand = new NvmReadCommand(queuePair);
+			readCommand.setSendInline(true);
+			readCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
+			readCommands.add(readCommand);
+		}
+		this.registeredBufferCache = new NvmfRegisteredBufferCache(queuePair);
+		this.outstandingOperations = new AtomicInteger(0);
+		this.stagingBufferCache = new NvmfStagingBufferCache(bufferCache,
+				NvmfStorageConstants.STAGING_CACHE_SIZE, getLBADataSize());
+		this.statistics = statistics;
 	}
 
-	public int getSectorSize() {
-		return sectorSize;
+	public int getLBADataSize() {
+		return lbaDataSize;
+	}
+
+	public long getNamespaceCapacity() {
+		return namespaceCapacity;
 	}
 
 	enum Operation {
 		WRITE,
-		READ;
+		READ
 	}
 
-	public StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset)
-			throws IOException, InterruptedException {
-		int length = buffer.remaining();
-		if (length > CrailConstants.BLOCK_SIZE){
-			throw new IOException("write size too large " + length);
-		}
-		if (length <= 0){
-			throw new IOException("write size too small, len " + length);
-		}
-		if (buffer.position() < 0){
-			throw new IOException("local offset too small " + buffer.position());
-		}
-		if (remoteOffset < 0){
-			throw new IOException("remote offset too small " + remoteOffset);
-		}
+	void putOperation() {
+		outstandingOperations.decrementAndGet();
+	}
 
-		if (remoteMr.getAddr() + remoteOffset + length > endpoint.getNamespaceSize()){
-			long tmpAddr = remoteMr.getAddr() + remoteOffset + length;
-			throw new IOException("remote fileOffset + remoteOffset + len = " + tmpAddr + " - size = " +
-					endpoint.getNamespaceSize());
+	private boolean tryGetOperation() {
+		int outstandingOperationsOld = outstandingOperations.get();
+		if (outstandingOperationsOld != NvmfStorageConstants.QUEUE_SIZE) {
+			return outstandingOperations.compareAndSet(outstandingOperationsOld, outstandingOperationsOld + 1);
 		}
+		return false;
+	}
 
-//		LOG.info("op = " + op.name() +
-//				", position = " + buffer.position() +
-//				", localOffset = " + buffer.position() +
-//				", remoteOffset = " + remoteOffset +
-//				", remoteAddr = " + remoteMr.getAddr() +
-//				", length = " + length);
-
-		NvmeCommand command = freeCommands.poll();
-		while (command == null) {
-			poll();
-			command = freeCommands.poll();
-		}
+	private static int divCeil(int a, int b) {
+		return (a + b - 1) / b;
+	}
 
-		boolean aligned = NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0
-				&& NvmfStorageUtils.namespaceSectorOffset(sectorSize, length) == 0;
-		long lba = NvmfStorageUtils.linearBlockAddress(remoteMr, remoteOffset, sectorSize);
-		StorageFuture future = null;
-		if (aligned) {
-//			LOG.info("aligned");
-			command.setBuffer(buffer.getByteBuffer()).setLinearBlockAddress(lba);
-			switch(op) {
-				case READ:
-					command.read();
-					break;
-				case WRITE:
-					command.write();
-					break;
-			}
-			future = futures[(int)command.getId()] = new NvmfStorageFuture(this, length);
-			command.execute();
-		} else {
-//			LOG.info("unaligned");
-			long alignedLength = NvmfStorageUtils.alignLength(sectorSize, remoteOffset, length);
+	private int getNumLogicalBlocks(CrailBuffer buffer) {
+		return divCeil(buffer.remaining(), getLBADataSize());
+	}
 
-			CrailBuffer stagingBuffer = cache.allocateBuffer();
-			stagingBuffer.limit((int)alignedLength);
+	StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws InterruptedException, IOException {
+		assert blockInfo.getAddr() + remoteOffset + buffer.remaining() <= getNamespaceCapacity();
+		assert remoteOffset >= 0;
+		assert buffer.remaining() <= CrailConstants.BLOCK_SIZE;
+
+		long startingAddress = blockInfo.getAddr() + remoteOffset;
+		if (startingAddress % getLBADataSize() != 0 ||
+				((startingAddress + buffer.remaining()) % getLBADataSize() != 0 && op == Operation.WRITE)) {
+			if (op == Operation.READ) {
+				throw new IOException("Unaligned read access is not supported. Address (" + startingAddress +
+						") needs to be multiple of LBA data size " + getLBADataSize());
+			}
 			try {
-				switch(op) {
-					case READ: {
-						NvmfStorageFuture f = futures[(int)command.getId()] = new NvmfStorageFuture(this, (int)alignedLength);
-						command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute();
-						future = new NvmfStorageUnalignedReadFuture(f, this, buffer, remoteMr, remoteOffset, stagingBuffer);
-						break;
-					}
-					case WRITE: {
-						if (NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0) {
-							// Do not read if the offset is aligned to sector size
-							int sizeToWrite = length;
-							stagingBuffer.put(buffer.getByteBuffer());
-							stagingBuffer.position(0);
-							command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).write().execute();
-							future = futures[(int)command.getId()] = new NvmfStorageUnalignedWriteFuture(this, sizeToWrite, stagingBuffer);
-						} else {
-							// RMW but append only file system allows only reading last sector
-							// and dir entries are sector aligned
-							stagingBuffer.limit(sectorSize);
-							NvmfStorageFuture f = futures[(int)command.getId()] = new NvmfStorageFuture(this, sectorSize);
-							command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute();
-							future = new NvmfStorageUnalignedRMWFuture(f, this, buffer, remoteMr, remoteOffset, stagingBuffer);
-						}
-						break;
-					}
-				}
-			} catch (NoSuchFieldException e) {
-				throw new IOException(e);
-			} catch (IllegalAccessException e) {
+				return new NvmfUnalignedWriteFuture(this, buffer, blockInfo, remoteOffset);
+			} catch (Exception e) {
 				throw new IOException(e);
 			}
 		}
 
+		if (!tryGetOperation()) {
+			do {
+				poll();
+			} while (!tryGetOperation());
+		}
+
+		NvmIOCommand<? extends NvmIOCommandCapsule> command;
+		NvmfFuture<?> future;
+		Response<NvmResponseCapsule> response;
+		if (op == Operation.READ) {
+			NvmReadCommand readCommand = readCommands.remove();
+			response = readCommand.newResponse();
+			future = new NvmfFuture<>(this, readCommand, response, readCommands, buffer.remaining());
+			command = readCommand;
+		} else {
+			NvmWriteCommand writeCommand = writeCommands.remove();
+			response = writeCommand.newResponse();
+			future = new NvmfFuture<>(this, writeCommand, response, writeCommands, buffer.remaining());
+			command = writeCommand;
+		}
+		command.setCallback(future);
+		response.setCallback(future);
+
+		NvmIOCommandSQE sqe = command.getCommandCapsule().getSubmissionQueueEntry();
+		long startingLBA = startingAddress / getLBADataSize();
+		sqe.setStartingLBA(startingLBA);
+		/* TODO: on read this potentially overwrites data beyond the set limit */
+		short numLogicalBlocks = (short)(getNumLogicalBlocks(buffer) - 1);
+		sqe.setNumberOfLogicalBlocks(numLogicalBlocks);
+		KeyedNativeBuffer registeredBuffer = registeredBufferCache.get(buffer);
+		registeredBuffer.position(buffer.position());
+		registeredBuffer.limit(registeredBuffer.position() + (numLogicalBlocks + 1) * getLBADataSize());
+		command.getCommandCapsule().setSGLDescriptor(registeredBuffer);
+
+		command.execute(response);
+
 		return future;
 	}
 
-	public StorageFuture write(CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset)
-			throws IOException, InterruptedException {
+	public StorageFuture write(CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws InterruptedException, IOException {
 		return Op(Operation.WRITE, buffer, blockInfo, remoteOffset);
 	}
 
-	public StorageFuture read(CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset)
-			throws IOException, InterruptedException {
+	public StorageFuture read(CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws InterruptedException, IOException {
 		return Op(Operation.READ, buffer, blockInfo, remoteOffset);
 	}
 
 	void poll() throws IOException {
-		long[] ca = completed.get();
-		int numberCompletions = endpoint.processCompletions(ca);
-		for (int i = 0; i < numberCompletions; i++) {
-			int idx = (int)ca[i];
-			NvmeCommand command = commands[idx];
-			IOCompletion completion = command.getCompletion();
-			completion.done();
-			futures[idx].signal(completion.getStatusCodeType(), completion.getStatusCode());
-			freeCommands.add(command);
-		}
-	}
-
-	void putBuffer(CrailBuffer buffer) throws IOException {
-		cache.freeBuffer(buffer);
+		queuePair.poll();
 	}
 
 	public void close() throws IOException, InterruptedException {
-		endpoint.close();
+		registeredBufferCache.free();
+		controller.free();
+
 	}
 
 	public boolean isLocal() {
 		return false;
 	}
+
+	NvmfStagingBufferCache getStagingBufferCache() {
+		return stagingBufferCache;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageFuture.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageFuture.java
deleted file mode 100644
index 9dbab36..0000000
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageFuture.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf.client;
-
-import com.ibm.disni.nvmef.spdk.NvmeGenericCommandStatusCode;
-import com.ibm.disni.nvmef.spdk.NvmeStatusCodeType;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.crail.storage.StorageFuture;
-import org.apache.crail.storage.StorageResult;
-import org.apache.crail.storage.nvmf.NvmfStorageConstants;
-
-public class NvmfStorageFuture implements StorageFuture, StorageResult {
-
-	protected final NvmfStorageEndpoint endpoint;
-	private final int len;
-	private Exception exception;
-	private volatile boolean done;
-
-	public NvmfStorageFuture(NvmfStorageEndpoint endpoint, int len) {
-		this.endpoint = endpoint;
-		this.len = len;
-	}
-
-	public int getLen() {
-		return len;
-	}
-
-	public boolean cancel(boolean b) {
-		return false;
-	}
-
-	public boolean isCancelled() {
-		return false;
-	}
-
-	void signal(NvmeStatusCodeType statusCodeType, int statusCode) {
-		if (statusCodeType != NvmeStatusCodeType.GENERIC &&
-				statusCode != NvmeGenericCommandStatusCode.SUCCESS.getNumVal()) {
-			exception = new ExecutionException("Error: " + statusCodeType.name() + " - " + statusCode) {};
-		}
-		done = true;
-	}
-
-	public boolean isDone() {
-		if (!done) {
-			try {
-				endpoint.poll();
-			} catch (IOException e) {
-				exception = e;
-			}
-		}
-		return done;
-	}
-
-	public StorageResult get() throws InterruptedException, ExecutionException {
-		try {
-			return get(NvmfStorageConstants.TIME_OUT, NvmfStorageConstants.TIME_UNIT);
-		} catch (TimeoutException e) {
-			throw new ExecutionException(e);
-		}
-	}
-
-	public StorageResult get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
-		if (exception != null) {
-			throw new ExecutionException(exception);
-		}
-		if (!done) {
-			long start = System.nanoTime();
-			long end = start + TimeUnit.NANOSECONDS.convert(timeout, timeUnit);
-			boolean waitTimeOut;
-			do {
-				try {
-					endpoint.poll();
-				} catch (IOException e) {
-					throw new ExecutionException(e);
-				}
-				// we don't want to trigger timeout on first iteration
-				waitTimeOut = System.nanoTime() > end;
-			} while (!done && !waitTimeOut);
-			if (!done && waitTimeOut) {
-				throw new TimeoutException("get wait time out!");
-			}
-			if (exception != null) {
-				throw new ExecutionException(exception);
-			}
-		}
-		return this;
-	}
-
-	@Override
-	public boolean isSynchronous() {
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedFuture.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedFuture.java
deleted file mode 100644
index 93d24d0..0000000
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedFuture.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf.client;
-
-import sun.misc.Unsafe;
-
-import java.lang.reflect.Field;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.crail.CrailBuffer;
-import org.apache.crail.metadata.BlockInfo;
-import org.apache.crail.storage.StorageFuture;
-import org.apache.crail.storage.StorageResult;
-import org.apache.crail.storage.nvmf.NvmfStorageConstants;
-
-public abstract class NvmfStorageUnalignedFuture implements StorageFuture, StorageResult  {
-	protected final NvmfStorageFuture initFuture;
-	protected final NvmfStorageEndpoint endpoint;
-	protected final CrailBuffer buffer;
-	protected final long localOffset;
-	protected final BlockInfo remoteMr;
-	protected final long remoteOffset;
-	protected final int len;
-	protected final CrailBuffer stagingBuffer;
-	protected boolean done;
-	protected Exception exception;
-	protected static Unsafe unsafe;
-
-	public NvmfStorageUnalignedFuture(NvmfStorageFuture future, NvmfStorageEndpoint endpoint, CrailBuffer buffer,
-								   BlockInfo remoteMr, long remoteOffset, CrailBuffer stagingBuffer)
-			throws NoSuchFieldException, IllegalAccessException {
-		this.initFuture = future;
-		this.endpoint = endpoint;
-		this.buffer = buffer;
-		this.localOffset = buffer.position();
-		this.remoteMr = remoteMr;
-		this.remoteOffset = remoteOffset;
-		this.len = buffer.remaining();
-		this.stagingBuffer = stagingBuffer;
-		initUnsafe();
-		done = false;
-	}
-
-	public boolean isDone() {
-		if (!done) {
-			try {
-				get(0, TimeUnit.NANOSECONDS);
-			} catch (InterruptedException e) {
-				exception = e;
-			} catch (ExecutionException e) {
-				exception = e;
-			} catch (TimeoutException e) {
-				// i.e. operation is not finished
-			}
-		}
-		return done;
-	}
-
-	public int getLen() {
-		return len;
-	}
-
-	public boolean cancel(boolean b) {
-		return false;
-	}
-
-	public boolean isCancelled() {
-		return false;
-	}
-
-	public StorageResult get() throws InterruptedException, ExecutionException {
-		try {
-			return get(NvmfStorageConstants.TIME_OUT, NvmfStorageConstants.TIME_UNIT);
-		} catch (TimeoutException e) {
-			throw new ExecutionException(e);
-		}
-	}
-
-	private static void initUnsafe() throws NoSuchFieldException, IllegalAccessException {
-		if (unsafe == null) {
-			Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
-			theUnsafe.setAccessible(true);
-			unsafe = (Unsafe) theUnsafe.get(null);
-		}
-	}
-
-	@Override
-	public boolean isSynchronous() {
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedRMWFuture.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedRMWFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedRMWFuture.java
deleted file mode 100644
index b0a5ae9..0000000
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedRMWFuture.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf.client;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.crail.CrailBuffer;
-import org.apache.crail.metadata.BlockInfo;
-import org.apache.crail.storage.StorageResult;
-
-public class NvmfStorageUnalignedRMWFuture extends NvmfStorageUnalignedFuture {
-
-	private boolean initDone;
-	private Future<StorageResult> writeFuture;
-
-	public NvmfStorageUnalignedRMWFuture(NvmfStorageFuture future, NvmfStorageEndpoint endpoint, CrailBuffer buffer,
-									  BlockInfo remoteMr, long remoteOffset, CrailBuffer stagingBuffer)
-			throws NoSuchFieldException, IllegalAccessException {
-		super(future, endpoint, buffer, remoteMr, remoteOffset, stagingBuffer);
-		initDone = false;
-	}
-
-	public StorageResult get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
-		if (exception != null) {
-			throw new ExecutionException(exception);
-		}
-		if (!done) {
-			if (!initDone) {
-				initFuture.get(l, timeUnit);
-				long srcAddr = buffer.address() + localOffset;
-				long dstAddr = stagingBuffer.address() + NvmfStorageUtils.namespaceSectorOffset(
-						endpoint.getSectorSize(), remoteOffset);
-				unsafe.copyMemory(srcAddr, dstAddr, len);
-
-				stagingBuffer.clear();
-				int alignedLen = (int) NvmfStorageUtils.alignLength(endpoint.getSectorSize(), remoteOffset, len);
-				stagingBuffer.limit(alignedLen);
-				try {
-					writeFuture = endpoint.write(stagingBuffer, remoteMr, NvmfStorageUtils.alignOffset(endpoint.getSectorSize(), remoteOffset));
-				} catch (IOException e) {
-					throw new ExecutionException(e);
-				}
-				initDone =true;
-			}
-			writeFuture.get(l, timeUnit);
-			try {
-				endpoint.putBuffer(stagingBuffer);
-			} catch (IOException e) {
-				throw new ExecutionException(e);
-			}
-			done = true;
-		}
-		return this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedReadFuture.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedReadFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedReadFuture.java
deleted file mode 100644
index 61f460e..0000000
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedReadFuture.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf.client;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.crail.CrailBuffer;
-import org.apache.crail.metadata.BlockInfo;
-import org.apache.crail.storage.StorageResult;
-
-public class NvmfStorageUnalignedReadFuture extends NvmfStorageUnalignedFuture {
-
-	public NvmfStorageUnalignedReadFuture(NvmfStorageFuture future, NvmfStorageEndpoint endpoint, CrailBuffer buffer, BlockInfo remoteMr,
-									   long remoteOffset, CrailBuffer stagingBuffer)
-			throws NoSuchFieldException, IllegalAccessException {
-		super(future, endpoint, buffer, remoteMr, remoteOffset, stagingBuffer);
-	}
-
-	public StorageResult get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
-		if (exception != null) {
-			throw new ExecutionException(exception);
-		}
-		if (!done) {
-			initFuture.get(l, timeUnit);
-			long srcAddr = stagingBuffer.address() + 
-					NvmfStorageUtils.namespaceSectorOffset(endpoint.getSectorSize(), remoteOffset);
-			long dstAddr = buffer.address() + localOffset;
-			unsafe.copyMemory(srcAddr, dstAddr, len);
-			done = true;
-			try {
-				endpoint.putBuffer(stagingBuffer);
-			} catch (IOException e) {
-				throw new ExecutionException(e);
-			}
-		}
-		return this;
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedWriteFuture.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedWriteFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedWriteFuture.java
deleted file mode 100644
index a842033..0000000
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedWriteFuture.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf.client;
-
-import com.ibm.disni.nvmef.spdk.NvmeStatusCodeType;
-
-import java.io.IOException;
-
-import org.apache.crail.CrailBuffer;
-
-/**
- * Created by jpf on 23.05.17.
- */
-public class NvmfStorageUnalignedWriteFuture extends NvmfStorageFuture {
-
-	private CrailBuffer stagingBuffer;
-
-	public NvmfStorageUnalignedWriteFuture(NvmfStorageEndpoint endpoint, int len, CrailBuffer stagingBuffer) {
-		super(endpoint, len);
-		this.stagingBuffer = stagingBuffer;
-	}
-
-	@Override
-	void signal(NvmeStatusCodeType statusCodeType, int statusCode) {
-		try {
-			endpoint.putBuffer(stagingBuffer);
-		} catch (IOException e) {
-			e.printStackTrace();
-			System.exit(-1);
-		}
-		super.signal(statusCodeType, statusCode);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUtils.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUtils.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUtils.java
deleted file mode 100644
index 562e495..0000000
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf.client;
-
-import org.apache.crail.metadata.BlockInfo;
-
-/**
- * Created by jpf on 14.02.17.
- */
-public class NvmfStorageUtils {
-
-	public static long linearBlockAddress(BlockInfo remoteMr, long remoteOffset, int sectorSize) {
-		return (remoteMr.getAddr() + remoteOffset) / (long)sectorSize;
-	}
-
-	public static long namespaceSectorOffset(int sectorSize, long fileOffset) {
-		return fileOffset % (long)sectorSize;
-	}
-
-	public static long alignLength(int sectorSize, long remoteOffset, long len) {
-		long alignedSize = len + namespaceSectorOffset(sectorSize, remoteOffset);
-		if (namespaceSectorOffset(sectorSize, alignedSize) != 0) {
-			alignedSize += (long)sectorSize - namespaceSectorOffset(sectorSize, alignedSize);
-		}
-		return alignedSize;
-	}
-
-	public static long alignOffset(int sectorSize, long fileOffset) {
-		return fileOffset - namespaceSectorOffset(sectorSize, fileOffset);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java
new file mode 100644
index 0000000..08eab5b
--- /dev/null
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright (C) 2018, IBM Corporation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crail.storage.nvmf.client;
+
+import org.apache.crail.CrailBuffer;
+import org.apache.crail.metadata.BlockInfo;
+import org.apache.crail.storage.StorageFuture;
+import org.apache.crail.storage.StorageResult;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+
+public class NvmfUnalignedWriteFuture implements StorageFuture {
+	private final NvmfStorageEndpoint endpoint;
+	private StorageFuture beginFuture;
+	private StorageFuture middleFuture;
+	private StorageFuture endFuture;
+	private final int written;
+	private NvmfStagingBufferCache.BufferCacheEntry beginBuffer;
+	private NvmfStagingBufferCache.BufferCacheEntry endBuffer;
+
+	private final boolean isSectorAligned(long address) {
+		return address % endpoint.getLBADataSize() == 0;
+	}
+
+	private final long floorToSectorSize(long address) {
+		return address - (address % endpoint.getLBADataSize());
+	}
+
+	private final int leftInSector(long address) {
+		return endpoint.getLBADataSize() - offsetInSector(address);
+	}
+
+	private final int offsetInSector(long address) {
+		return (int)(address % endpoint.getLBADataSize());
+	}
+
+	NvmfUnalignedWriteFuture(NvmfStorageEndpoint endpoint, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws Exception {
+		this.endpoint = endpoint;
+		this.written = buffer.remaining();
+		/* assume blockInfo.getAddr() is sector aligned */
+		assert isSectorAligned(blockInfo.getAddr());
+
+		long nextRemoteOffset = remoteOffset;
+		/* beginning */
+		if (!isSectorAligned(remoteOffset)) {
+			int copySize = Math.min(leftInSector(remoteOffset), buffer.remaining());
+			nextRemoteOffset = remoteOffset + copySize;
+			int oldLimit = buffer.limit();
+			buffer.limit(buffer.position() + copySize);
+			long alignedRemoteOffset = floorToSectorSize(remoteOffset);
+			long alignedRemoteAddress = blockInfo.getAddr() + alignedRemoteOffset;
+			beginBuffer = endpoint.getStagingBufferCache().getExisting(alignedRemoteAddress);
+			if (beginBuffer == null) {
+				/* we had to delete the old buffer because we ran out of space. This should happen rarely. */
+				beginBuffer = endpoint.getStagingBufferCache().get(alignedRemoteAddress);
+				endpoint.read(beginBuffer.getBuffer(), blockInfo, alignedRemoteOffset).get();
+			} else {
+				/* Wait for previous end operation to finish */
+				beginBuffer.getFuture().get();
+			}
+			CrailBuffer stagingBuffer = beginBuffer.getBuffer();
+			stagingBuffer.position(offsetInSector(remoteOffset));
+			stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
+			buffer.limit(oldLimit);
+			stagingBuffer.position(0);
+			beginFuture = endpoint.write(stagingBuffer, blockInfo, alignedRemoteOffset);
+			beginBuffer.setFuture(beginFuture);
+			stagingBuffer.position(offsetInSector(remoteOffset));
+		}
+
+		/* middle */
+		if (isSectorAligned(nextRemoteOffset) && buffer.remaining() >= endpoint.getLBADataSize()) {
+			int oldLimit = buffer.limit();
+			buffer.limit(buffer.position() + (int)floorToSectorSize(buffer.remaining()));
+			int toWrite = buffer.remaining();
+			middleFuture = endpoint.write(buffer, blockInfo, nextRemoteOffset);
+			nextRemoteOffset += toWrite;
+			buffer.position(buffer.limit());
+			buffer.limit(oldLimit);
+		}
+
+		/* end */
+		if (buffer.remaining() > 0) {
+			endBuffer = endpoint.getStagingBufferCache().get(blockInfo.getAddr() + nextRemoteOffset);
+			CrailBuffer stagingBuffer = endBuffer.getBuffer();
+			stagingBuffer.position(0);
+			stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
+			stagingBuffer.position(0);
+			endFuture = endpoint.write(stagingBuffer, blockInfo, nextRemoteOffset);
+			endBuffer.setFuture(endFuture);
+		}
+	}
+
+	@Override
+	public boolean isSynchronous() {
+		return false;
+	}
+
+	@Override
+	public boolean cancel(boolean b) {
+		return false;
+	}
+
+	@Override
+	public boolean isCancelled() {
+		return false;
+	}
+
+	private static boolean checkIfFutureIsDone(StorageFuture future) {
+		return (future != null && future.isDone()) || future == null;
+	}
+
+	@Override
+	public boolean isDone() {
+		if (beginFuture != null && beginFuture.isDone()) {
+			if (beginBuffer != null) {
+				beginBuffer.put();
+				beginBuffer = null;
+			}
+		}
+		if (endFuture != null && endFuture.isDone()) {
+			if (endBuffer != null) {
+				endBuffer.put();
+				endBuffer = null;
+			}
+		}
+		return beginBuffer == null && checkIfFutureIsDone(middleFuture) && endBuffer == null;
+	}
+
+	@Override
+	public StorageResult get() throws InterruptedException, ExecutionException {
+		try {
+			return get(2, TimeUnit.MINUTES);
+		} catch (TimeoutException e) {
+			throw new ExecutionException(e);
+		}
+	}
+
+	@Override
+	public StorageResult get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
+		if (!isDone()) {
+			long start = System.nanoTime();
+			long end = start + TimeUnit.NANOSECONDS.convert(timeout, timeUnit);
+			boolean waitTimeOut;
+			do {
+				waitTimeOut = System.nanoTime() > end;
+			} while (!isDone() && !waitTimeOut);
+			if (!isDone() && waitTimeOut) {
+				throw new TimeoutException("poll wait time out!");
+			}
+		}
+		if (beginFuture != null) {
+			beginFuture.get();
+		}
+		if (middleFuture != null) {
+			middleFuture.get();
+		}
+		if (endFuture != null) {
+			endFuture.get();
+		}
+		return () -> written;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java b/storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java
new file mode 100644
index 0000000..9d4d7bf
--- /dev/null
+++ b/storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java
@@ -0,0 +1,73 @@
+package org.apache.crail.storage.nvmf.client;
+
+import org.apache.crail.CrailBuffer;
+import org.apache.crail.CrailBufferCache;
+import org.apache.crail.conf.CrailConfiguration;
+import org.apache.crail.conf.CrailConstants;
+import org.apache.crail.memory.MappedBufferCache;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+public class NvmfStagingBufferCacheTest {
+
+	@BeforeClass
+	public static void init() throws IOException {
+		CrailConstants.updateConstants(new CrailConfiguration());
+	}
+
+	private static CrailBufferCache bufferCache;
+	static CrailBufferCache getBufferCache() throws IOException {
+		if (bufferCache == null) {
+			bufferCache = new MappedBufferCache();
+		}
+		return bufferCache;
+	}
+
+
+	@Test(expected = IllegalArgumentException.class)
+	public void createBufferCache() throws IOException {
+		new NvmfStagingBufferCache(getBufferCache(), -1, 512);
+		new NvmfStagingBufferCache(getBufferCache(),0, 512);
+		new NvmfStagingBufferCache(getBufferCache(),1024, -1);
+		new NvmfStagingBufferCache(getBufferCache(),1024, 0);
+	}
+
+	@Test(expected = OutOfMemoryError.class)
+	public void outOfMemory() throws Exception {
+		NvmfStagingBufferCache bufferCache = new NvmfStagingBufferCache(getBufferCache(),1, 512);
+		NvmfStagingBufferCache.BufferCacheEntry bufferCacheEntry = bufferCache.get(0);
+		NvmfStagingBufferCache.BufferCacheEntry bufferCacheEntry2 = bufferCache.get(1);
+	}
+
+	@Test
+	public void bufferExists() throws Exception {
+		NvmfStagingBufferCache bufferCache = new NvmfStagingBufferCache(getBufferCache(),1, 512);
+		NvmfStagingBufferCache.BufferCacheEntry bufferCacheEntry = bufferCache.get(0);
+		NvmfStagingBufferCache.BufferCacheEntry existingBufferCacheEntry = bufferCache.getExisting(0);
+		assertEquals(bufferCacheEntry, existingBufferCacheEntry);
+	}
+
+	@Test
+	public void recycleBuffers() throws Exception {
+		NvmfStagingBufferCache.BufferCacheEntry bufferCacheEntry[] = new NvmfStagingBufferCache.BufferCacheEntry[5];
+		Set<CrailBuffer> buffers = new HashSet<>();
+		NvmfStagingBufferCache bufferCache = new NvmfStagingBufferCache(getBufferCache(), bufferCacheEntry.length, 512);
+		for (int i = 0; i < bufferCacheEntry.length; i++) {
+			bufferCacheEntry[i] = bufferCache.get(i);
+			buffers.add(bufferCacheEntry[i].getBuffer());
+			bufferCacheEntry[i].put();
+		}
+		for (int i = 0; i < bufferCacheEntry.length; i++) {
+			bufferCacheEntry[i] = bufferCache.get(i + bufferCacheEntry.length);
+			assertTrue(buffers.remove(bufferCacheEntry[i].getBuffer()));
+		}
+	}
+}
\ No newline at end of file


[15/16] incubator-crail git commit: Crail internal version to 3100

Posted by pe...@apache.org.
Crail internal version to 3100

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/dc50fc19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/dc50fc19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/dc50fc19

Branch: refs/heads/master
Commit: dc50fc1937c8477dff507c2b148124ed280f802c
Parents: 89820d9
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Tue Apr 10 13:08:59 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:03 2018 +0200

----------------------------------------------------------------------
 .../org/apache/crail/conf/CrailConstants.java   | 162 +++++++++----------
 1 file changed, 81 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/dc50fc19/client/src/main/java/org/apache/crail/conf/CrailConstants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/crail/conf/CrailConstants.java b/client/src/main/java/org/apache/crail/conf/CrailConstants.java
index 4628906..313c544 100644
--- a/client/src/main/java/org/apache/crail/conf/CrailConstants.java
+++ b/client/src/main/java/org/apache/crail/conf/CrailConstants.java
@@ -26,113 +26,113 @@ import org.slf4j.Logger;
 
 public class CrailConstants {
 	private static final Logger LOG = CrailUtils.getLogger();
-	
+
 	public static final String VERSION_KEY = "crail.version";
-	public static int VERSION = 3004;
-	
+	public static int VERSION = 3100;
+
 	public static final String DIRECTORY_DEPTH_KEY = "crail.directorydepth";
 	public static int DIRECTORY_DEPTH = 16;
-	
+
 	public static final String TOKEN_EXPIRATION_KEY = "crail.tokenexpiration";
 	public static long TOKEN_EXPIRATION = 10;
-	
+
 	public static final String BLOCK_SIZE_KEY = "crail.blocksize";
 	public static long BLOCK_SIZE = 67108864;
-	
+
 	public static final String CACHE_LIMIT_KEY = "crail.cachelimit";
 	public static long CACHE_LIMIT = 1073741824;
-	
+
 	public static final String CACHE_PATH_KEY = "crail.cachepath";
-	public static String CACHE_PATH = "/home/stu/craildata/cache";	
-	
+	public static String CACHE_PATH = "/home/stu/craildata/cache";
+
 	public static final String USER_KEY = "crail.user";
 	public static String USER = "stu";
-	
+
 	public static final String SHADOW_REPLICATION_KEY = "crail.shadowreplication";
-	public static int SHADOW_REPLICATION = 1;	
-	
+	public static int SHADOW_REPLICATION = 1;
+
 	public static final String DEBUG_KEY = "crail.debug";
 	public static boolean DEBUG = false;
-	
+
 	public static final String STATISTICS_KEY = "crail.statistics";
-	public static boolean STATISTICS = true;	
-	
+	public static boolean STATISTICS = true;
+
 	public static final String RPC_TIMEOUT_KEY = "crail.rpctimeout";
 	public static int RPC_TIMEOUT = 1000;
-	
+
 	public static final String DATA_TIMEOUT_KEY = "crail.datatimeout";
-	public static int DATA_TIMEOUT = 1000;	
-	
+	public static int DATA_TIMEOUT = 1000;
+
 	public static final String BUFFER_SIZE_KEY = "crail.buffersize";
 	public static int BUFFER_SIZE = 1048576;
-	
+
 	public static final String SLICE_SIZE_KEY = "crail.slicesize";
-	public static int SLICE_SIZE = 524288;		
-	
+	public static int SLICE_SIZE = 524288;
+
 	public static final String SINGLETON_KEY = "crail.singleton";
-	public static boolean SINGLETON = false;	
-	
+	public static boolean SINGLETON = false;
+
 	public static final String REGION_SIZE_KEY = "crail.regionsize";
 	public static long REGION_SIZE = 1073741824;
-	
+
 	public static final String DIRECTORY_RECORD_KEY = "crail.directoryrecord";
-	public static int DIRECTORY_RECORD = 512;	
-	
+	public static int DIRECTORY_RECORD = 512;
+
 	public static final String DIRECTORY_RANDOMIZE_KEY = "crail.directoryrandomize";
 	public static boolean DIRECTORY_RANDOMIZE = true;
-	
+
 	public static final String CACHE_IMPL_KEY = "crail.cacheimpl";
 	public static String CACHE_IMPL = "org.apache.crail.memory.MappedBufferCache";
-	
+
 	public static final String LOCATION_MAP_KEY = "crail.locationmap";
-	public static String LOCATION_MAP = "";		
-	
+	public static String LOCATION_MAP = "";
+
 	//namenode interface
 	public static final String NAMENODE_ADDRESS_KEY = "crail.namenode.address";
 	public static String NAMENODE_ADDRESS = "";
-	
+
 	public static final String NAMENODE_FILEBLOCKS_KEY = "crail.namenode.fileblocks";
-	public static int NAMENODE_FILEBLOCKS = 16;	
-	
+	public static int NAMENODE_FILEBLOCKS = 16;
+
 	public static final String NAMENODE_BLOCKSELECTION_KEY = "crail.namenode.blockselection";
-	public static String NAMENODE_BLOCKSELECTION = "roundrobin";	
-	
+	public static String NAMENODE_BLOCKSELECTION = "roundrobin";
+
 	public static final String NAMENODE_RPC_TYPE_KEY = "crail.namenode.rpctype";
 	public static String NAMENODE_RPC_TYPE = "org.apache.crail.namenode.rpc.tcp.TcpNameNode";
-	
+
 	public static final String NAMENODE_RPC_SERVICE_KEY = "crail.namenode.rpcservice";
-	public static String NAMENODE_RPC_SERVICE = "org.apache.crail.namenode.NameNodeService";	
-	
+	public static String NAMENODE_RPC_SERVICE = "org.apache.crail.namenode.NameNodeService";
+
 	public static final String NAMENODE_LOG_KEY = "crail.namenode.log";
-	public static String NAMENODE_LOG = "";		
-	
+	public static String NAMENODE_LOG = "";
+
 	//storage interface
 	public static final String STORAGE_TYPES_KEY = "crail.storage.types";
-	public static String STORAGE_TYPES = "org.apache.crail.storage.tcp.TcpStorageTier";		
-	
+	public static String STORAGE_TYPES = "org.apache.crail.storage.tcp.TcpStorageTier";
+
 	public static final String STORAGE_CLASSES_KEY = "crail.storage.classes";
-	public static int STORAGE_CLASSES = 1;	
-	
+	public static int STORAGE_CLASSES = 1;
+
 	public static final String STORAGE_ROOTCLASS_KEY = "crail.storage.rootclass";
-	public static int STORAGE_ROOTCLASS = 0;		
-	
+	public static int STORAGE_ROOTCLASS = 0;
+
 	public static final String STORAGE_KEEPALIVE_KEY = "crail.storage.keepalive";
-	public static int STORAGE_KEEPALIVE = 2;		
-	
+	public static int STORAGE_KEEPALIVE = 2;
+
 	public static void updateConstants(CrailConfiguration conf){
 		//general
 		if (conf.get(DIRECTORY_DEPTH_KEY) != null) {
 			DIRECTORY_DEPTH = Integer.parseInt(conf.get(DIRECTORY_DEPTH_KEY));
-		}			
+		}
 		if (conf.get(TOKEN_EXPIRATION_KEY) != null) {
 			TOKEN_EXPIRATION = Long.parseLong(conf.get(TOKEN_EXPIRATION_KEY));
-		}		
+		}
 		if (conf.get(BLOCK_SIZE_KEY) != null) {
 			BLOCK_SIZE = Long.parseLong(conf.get(BLOCK_SIZE_KEY));
-		}			
+		}
 		if (conf.get(CACHE_LIMIT_KEY) != null) {
 			CACHE_LIMIT = Long.parseLong(conf.get(CACHE_LIMIT_KEY));
-		}			
+		}
 		if (conf.get(CACHE_PATH_KEY) != null) {
 			CACHE_PATH = conf.get(CACHE_PATH_KEY);
 		}
@@ -141,34 +141,34 @@ public class CrailConstants {
 		}
 		if (conf.get(SHADOW_REPLICATION_KEY) != null) {
 			SHADOW_REPLICATION = Integer.parseInt(conf.get(SHADOW_REPLICATION_KEY));
-		}	
+		}
 		if (conf.get(DEBUG_KEY) != null) {
 			DEBUG = Boolean.parseBoolean(conf.get(DEBUG_KEY));
-		}	
+		}
 		if (conf.get(STATISTICS_KEY) != null) {
 			STATISTICS = Boolean.parseBoolean(conf.get(STATISTICS_KEY));
-		}			
+		}
 		if (conf.get(RPC_TIMEOUT_KEY) != null) {
 			RPC_TIMEOUT = Integer.parseInt(conf.get(RPC_TIMEOUT_KEY));
-		}	
+		}
 		if (conf.get(DATA_TIMEOUT_KEY) != null) {
 			DATA_TIMEOUT = Integer.parseInt(conf.get(DATA_TIMEOUT_KEY));
-		}	
+		}
 		if (conf.get(BUFFER_SIZE_KEY) != null) {
 			BUFFER_SIZE = Integer.parseInt(conf.get(BUFFER_SIZE_KEY));
-		}	
+		}
 		if (conf.get(SLICE_SIZE_KEY) != null) {
 			SLICE_SIZE = Integer.parseInt(conf.get(SLICE_SIZE_KEY));
-		}			
+		}
 		if (conf.get(CrailConstants.SINGLETON_KEY) != null) {
 			SINGLETON = conf.getBoolean(CrailConstants.SINGLETON_KEY, false);
-		}	
+		}
 		if (conf.get(REGION_SIZE_KEY) != null) {
 			REGION_SIZE = Integer.parseInt(conf.get(REGION_SIZE_KEY));
-		}	
+		}
 		if (conf.get(DIRECTORY_RECORD_KEY) != null) {
 			DIRECTORY_RECORD = Integer.parseInt(conf.get(DIRECTORY_RECORD_KEY));
-		}	
+		}
 		if (conf.get(CrailConstants.DIRECTORY_RANDOMIZE_KEY) != null) {
 			DIRECTORY_RANDOMIZE = conf.getBoolean(CrailConstants.DIRECTORY_RANDOMIZE_KEY, false);
 		}
@@ -177,42 +177,42 @@ public class CrailConstants {
 		}
 		if (conf.get(LOCATION_MAP_KEY) != null) {
 			LOCATION_MAP = conf.get(LOCATION_MAP_KEY);
-		}		
-		
+		}
+
 		//namenode interface
 		if (conf.get(NAMENODE_ADDRESS_KEY) != null) {
 			NAMENODE_ADDRESS = conf.get(NAMENODE_ADDRESS_KEY);
-		} 
+		}
 		if (conf.get(NAMENODE_BLOCKSELECTION_KEY) != null) {
 			NAMENODE_BLOCKSELECTION = conf.get(NAMENODE_BLOCKSELECTION_KEY);
-		}		
+		}
 		if (conf.get(NAMENODE_FILEBLOCKS_KEY) != null) {
 			NAMENODE_FILEBLOCKS = Integer.parseInt(conf.get(NAMENODE_FILEBLOCKS_KEY));
-		}		
+		}
 		if (conf.get(NAMENODE_RPC_TYPE_KEY) != null) {
 			NAMENODE_RPC_TYPE = conf.get(NAMENODE_RPC_TYPE_KEY);
 		}
 		if (conf.get(NAMENODE_LOG_KEY) != null) {
 			NAMENODE_LOG = conf.get(NAMENODE_LOG_KEY);
-		}		
-		
+		}
+
 		//storage interface
 		if (conf.get(STORAGE_TYPES_KEY) != null) {
 			STORAGE_TYPES = conf.get(STORAGE_TYPES_KEY);
-		}	
+		}
 		if (conf.get(STORAGE_CLASSES_KEY) != null) {
-			STORAGE_CLASSES = Math.max(Integer.parseInt(conf.get(STORAGE_CLASSES_KEY)), CrailUtils.getStorageClasses(STORAGE_TYPES)); 
+			STORAGE_CLASSES = Math.max(Integer.parseInt(conf.get(STORAGE_CLASSES_KEY)), CrailUtils.getStorageClasses(STORAGE_TYPES));
 		} else {
 			STORAGE_CLASSES = CrailUtils.getStorageClasses(STORAGE_TYPES);
 		}
 		if (conf.get(STORAGE_ROOTCLASS_KEY) != null) {
 			STORAGE_ROOTCLASS = Integer.parseInt(conf.get(STORAGE_ROOTCLASS_KEY));
-		}	
+		}
 		if (conf.get(STORAGE_KEEPALIVE_KEY) != null) {
 			STORAGE_KEEPALIVE = Integer.parseInt(conf.get(STORAGE_KEEPALIVE_KEY));
-		}		
+		}
 	}
-	
+
 	public static void printConf(){
 		LOG.info(VERSION_KEY + " " + VERSION);
 		LOG.info(DIRECTORY_DEPTH_KEY + " " + DIRECTORY_DEPTH);
@@ -227,14 +227,14 @@ public class CrailConstants {
 		LOG.info(RPC_TIMEOUT_KEY + " " + RPC_TIMEOUT);
 		LOG.info(DATA_TIMEOUT_KEY + " " + DATA_TIMEOUT);
 		LOG.info(BUFFER_SIZE_KEY + " " + BUFFER_SIZE);
-		LOG.info(SLICE_SIZE_KEY + " " + SLICE_SIZE);		
+		LOG.info(SLICE_SIZE_KEY + " " + SLICE_SIZE);
 		LOG.info(SINGLETON_KEY + " " + SINGLETON);
 		LOG.info(REGION_SIZE_KEY + " " + REGION_SIZE);
 		LOG.info(DIRECTORY_RECORD_KEY + " " + DIRECTORY_RECORD);
-		LOG.info(DIRECTORY_RANDOMIZE_KEY + " " + DIRECTORY_RANDOMIZE);		
+		LOG.info(DIRECTORY_RANDOMIZE_KEY + " " + DIRECTORY_RANDOMIZE);
 		LOG.info(CACHE_IMPL_KEY + " " + CACHE_IMPL);
 		LOG.info(LOCATION_MAP_KEY + " " + LOCATION_MAP);
-		LOG.info(NAMENODE_ADDRESS_KEY + " " + NAMENODE_ADDRESS);		
+		LOG.info(NAMENODE_ADDRESS_KEY + " " + NAMENODE_ADDRESS);
 		LOG.info(NAMENODE_BLOCKSELECTION_KEY + " " + NAMENODE_BLOCKSELECTION);
 		LOG.info(NAMENODE_FILEBLOCKS_KEY + " " + NAMENODE_FILEBLOCKS);
 		LOG.info(NAMENODE_RPC_TYPE_KEY + " " + NAMENODE_RPC_TYPE);
@@ -244,17 +244,17 @@ public class CrailConstants {
 		LOG.info(STORAGE_ROOTCLASS_KEY + " " + STORAGE_ROOTCLASS);
 		LOG.info(STORAGE_KEEPALIVE_KEY + " " + STORAGE_KEEPALIVE);
 	}
-	
+
 	public static void verify() throws IOException {
 		if (CrailConstants.BUFFER_SIZE % CrailConstants.DIRECTORY_RECORD != 0){
 			throw new IOException("crail.buffersize must be multiple of " + CrailConstants.DIRECTORY_RECORD);
-		}	
+		}
 		if (Math.max(CrailConstants.BUFFER_SIZE, CrailConstants.SLICE_SIZE) % Math.min(CrailConstants.BUFFER_SIZE, CrailConstants.SLICE_SIZE) != 0){
 			throw new IOException("crail.slicesize must be multiple of buffersize " + CrailConstants.BUFFER_SIZE);
 		}
 		if (CrailConstants.STORAGE_CLASSES < CrailUtils.getStorageClasses(STORAGE_TYPES)){
 			throw new IOException("crail.storage.classes cannot be smaller than the number of storage types " + CrailUtils.getStorageClasses(STORAGE_TYPES));
-		}		
-		
+		}
+
 	}
 }


[09/16] incubator-crail git commit: Nvmf Tier: keep alive timer constant

Posted by pe...@apache.org.
Nvmf Tier: keep alive timer constant

Introduce new keep alive timer constant. For now we don't make
this configurable since jNVMf does not allow to change it yet, i.e.
it tells the target to use the default 2 minutes keep alive timer.
We set it to 110 seconds to not miss the 2 minutes deadline.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/6eb895c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/6eb895c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/6eb895c2

Branch: refs/heads/master
Commit: 6eb895c2ea273bbe70f2cf51aed9b6323ea83ef7
Parents: fb01cdf
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Tue Apr 10 10:46:03 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:03 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/crail/storage/nvmf/NvmfStorageClient.java    | 4 +---
 .../java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java | 4 ++++
 .../java/org/apache/crail/storage/nvmf/NvmfStorageServer.java    | 2 +-
 3 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/6eb895c2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
index cccf596..e352034 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
@@ -34,7 +34,6 @@ import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
 
 public class NvmfStorageClient implements StorageClient {
 	private static final Logger LOG = CrailUtils.getLogger();
@@ -60,9 +59,8 @@ public class NvmfStorageClient implements StorageClient {
 						return;
 					}
 				}
-				/* We use the default keep alive timer of 120s in jNVMf */
 				try {
-					Thread.sleep(TimeUnit.MILLISECONDS.convert(110, TimeUnit.SECONDS));
+					Thread.sleep(NvmfStorageConstants.KEEP_ALIVE_INTERVAL_MS);
 				} catch (InterruptedException e) {
 					return;
 				}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/6eb895c2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
index 8ce132b..78b940b 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
 
 public class NvmfStorageConstants {
 
@@ -55,6 +56,9 @@ public class NvmfStorageConstants {
 	public static final String STAGING_CACHE_SIZE_KEY = "stagingcachesize";
 	public static int STAGING_CACHE_SIZE = 262144;
 
+	/* We use the default keep alive timer of 120s in jNVMf */
+	public static long KEEP_ALIVE_INTERVAL_MS = TimeUnit.MILLISECONDS.convert(110, TimeUnit.SECONDS);
+
 	private static String fullKey(String key) {
 		return PREFIX + "." + key;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/6eb895c2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
index b15cdc9..d41c406 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
@@ -88,7 +88,7 @@ public class NvmfStorageServer implements StorageServer {
 		LOG.info("NnvmfStorageServer started with NVMf target " + getAddress());
 		while (isAlive) {
 			try {
-				Thread.sleep(1000 /* ms */);
+				Thread.sleep(NvmfStorageConstants.KEEP_ALIVE_INTERVAL_MS);
 				controller.keepAlive();
 			} catch (Exception e) {
 				e.printStackTrace();


[12/16] incubator-crail git commit: NvmfStorageEndpoint: Number of logical blocks fix type

Posted by pe...@apache.org.
NvmfStorageEndpoint: Number of logical blocks fix type

jNVMf API has been changed to take int instead of long, fix code
accordingly.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/4a303c82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/4a303c82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/4a303c82

Branch: refs/heads/master
Commit: 4a303c827a6fb68781bf2380b85a1ffa9cdfc8c3
Parents: 176bc39
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Tue Apr 10 11:03:24 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:03 2018 +0200

----------------------------------------------------------------------
 .../org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/4a303c82/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
index 655a304..58c7543 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
@@ -204,7 +204,7 @@ public class NvmfStorageEndpoint implements StorageEndpoint {
 		long startingLBA = startingAddress / getLBADataSize();
 		sqe.setStartingLba(startingLBA);
 		/* TODO: on read this potentially overwrites data beyond the set limit */
-		short numLogicalBlocks = (short)getNumLogicalBlocks(buffer);
+		int numLogicalBlocks = getNumLogicalBlocks(buffer);
 		buffer.limit(buffer.position() + numLogicalBlocks * getLBADataSize());
 		sqe.setNumberOfLogicalBlocks(numLogicalBlocks);
 		int remoteKey = registeredBufferCache.getRemoteKey(buffer);


[06/16] incubator-crail git commit: NvmfRegisteredBufferCache: simplify

Posted by pe...@apache.org.
NvmfRegisteredBufferCache: simplify

Simplify buffer cache by do not hold references to CrailBuffers.
We only return the corresponding key now instead of a wrapped
KeyedNativeBuffer.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/db754995
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/db754995
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/db754995

Branch: refs/heads/master
Commit: db754995cb9ce42e97bbd131a117a315877c31f0
Parents: 50cf268
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Mon Apr 9 15:22:41 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:02 2018 +0200

----------------------------------------------------------------------
 storage-nvmf/pom.xml                            |  2 +-
 .../nvmf/client/NvmfRegisteredBufferCache.java  | 51 ++++----------------
 .../nvmf/client/NvmfStorageEndpoint.java        |  9 ++--
 3 files changed, 15 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/db754995/storage-nvmf/pom.xml
----------------------------------------------------------------------
diff --git a/storage-nvmf/pom.xml b/storage-nvmf/pom.xml
index 20ad294..0e06569 100644
--- a/storage-nvmf/pom.xml
+++ b/storage-nvmf/pom.xml
@@ -24,7 +24,7 @@
     <dependency>
       <groupId>com.ibm.jnvmf</groupId>
       <artifactId>jnvmf</artifactId>
-      <version>1.0</version>
+      <version>1.2</version>
     </dependency>
     <dependency>
       <groupId>log4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/db754995/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java
index 0a364e8..ec9ee52 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java
@@ -21,7 +21,6 @@ package org.apache.crail.storage.nvmf.client;
 
 import com.ibm.jnvmf.Freeable;
 import com.ibm.jnvmf.KeyedNativeBuffer;
-import com.ibm.jnvmf.NativeByteBuffer;
 import com.ibm.jnvmf.QueuePair;
 import org.apache.crail.CrailBuffer;
 
@@ -31,68 +30,36 @@ import java.util.concurrent.ConcurrentHashMap;
 
 class NvmfRegisteredBufferCache implements Freeable {
 	private final QueuePair queuePair;
-	private final Map<CrailBuffer, KeyedNativeBuffer> bufferMap;
 	private final Map<Long, KeyedNativeBuffer> regionMap;
 	private boolean valid;
 
 	public NvmfRegisteredBufferCache(QueuePair queuePair) {
 		this.queuePair = queuePair;
-		this.bufferMap = new ConcurrentHashMap<>();
 		this.regionMap = new ConcurrentHashMap<>();
 		this.valid = true;
 	}
 
-	static class Buffer extends NativeByteBuffer implements KeyedNativeBuffer {
-		private final KeyedNativeBuffer registeredRegionBuffer;
-
-		Buffer(CrailBuffer buffer, KeyedNativeBuffer registeredRegionBuffer) {
-			super(buffer.getByteBuffer());
-			this.registeredRegionBuffer = registeredRegionBuffer;
-		}
-
-		@Override
-		public int getRemoteKey() {
-			return registeredRegionBuffer.getRemoteKey();
-		}
-
-		@Override
-		public int getLocalKey() {
-			return registeredRegionBuffer.getLocalKey();
-		}
-	}
-
-	KeyedNativeBuffer get(CrailBuffer buffer) throws IOException {
-		KeyedNativeBuffer keyedNativeBuffer = bufferMap.get(buffer);
+	int getRemoteKey(CrailBuffer buffer) throws IOException {
+		CrailBuffer regionBuffer = buffer.getRegion();
+		KeyedNativeBuffer keyedNativeBuffer = regionMap.get(regionBuffer.address());
 		if (keyedNativeBuffer == null) {
-			CrailBuffer regionBuffer = buffer.getRegion();
-			keyedNativeBuffer = regionMap.get(regionBuffer.address());
-			if (keyedNativeBuffer == null) {
-				/* region has not been registered yet */
-				keyedNativeBuffer = queuePair.registerMemory(regionBuffer.getByteBuffer());
-				KeyedNativeBuffer prevKeyedNativeBuffer =
-						regionMap.putIfAbsent(keyedNativeBuffer.getAddress(), keyedNativeBuffer);
-				if (prevKeyedNativeBuffer != null) {
-					/* someone registered the same region in parallel */
-					keyedNativeBuffer.free();
-					keyedNativeBuffer = prevKeyedNativeBuffer;
-				}
-			}
-			keyedNativeBuffer = new Buffer(buffer, keyedNativeBuffer);
+			/* region has not been registered yet */
+			keyedNativeBuffer = queuePair.registerMemory(regionBuffer.getByteBuffer());
 			KeyedNativeBuffer prevKeyedNativeBuffer =
-					bufferMap.putIfAbsent(buffer, keyedNativeBuffer);
+					regionMap.putIfAbsent(keyedNativeBuffer.getAddress(), keyedNativeBuffer);
 			if (prevKeyedNativeBuffer != null) {
-				/* someone added the same buffer parallel */
+				/* someone registered the same region in parallel */
 				keyedNativeBuffer.free();
 				keyedNativeBuffer = prevKeyedNativeBuffer;
 			}
 		}
-		return keyedNativeBuffer;
+		return keyedNativeBuffer.getRemoteKey();
 	}
 
 
 	@Override
 	public void free() throws IOException {
-		for (KeyedNativeBuffer buffer : bufferMap.values()) {
+		for (KeyedNativeBuffer buffer : regionMap.values()) {
 			buffer.free();
 		}
 		valid = false;

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/db754995/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
index 8188d82..e17a013 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
@@ -206,10 +206,11 @@ public class NvmfStorageEndpoint implements StorageEndpoint {
 		/* TODO: on read this potentially overwrites data beyond the set limit */
 		short numLogicalBlocks = (short)(getNumLogicalBlocks(buffer) - 1);
 		sqe.setNumberOfLogicalBlocks(numLogicalBlocks);
-		KeyedNativeBuffer registeredBuffer = registeredBufferCache.get(buffer);
-		registeredBuffer.position(buffer.position());
-		registeredBuffer.limit(registeredBuffer.position() + (numLogicalBlocks + 1) * getLBADataSize());
-		command.getCommandCapsule().setSglDescriptor(registeredBuffer);
+		int remoteKey = registeredBufferCache.getRemoteKey(buffer);
+		KeyedSglDataBlockDescriptor dataBlockDescriptor = sqe.getKeyedSglDataBlockDescriptor();
+		dataBlockDescriptor.setAddress(buffer.address() + buffer.position());
+		dataBlockDescriptor.setLength(buffer.remaining());
+		dataBlockDescriptor.setKey(remoteKey);
 
 		command.execute(response);
 


[05/16] incubator-crail git commit: NvmfStorageClient: keep alive thread

Posted by pe...@apache.org.
NvmfStorageClient: keep alive thread

Start keep alive thread which sends keep alive message to
each controller every 110s (timeout 120s). Otherwise thread sleeps.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/ac9e2990
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/ac9e2990
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/ac9e2990

Branch: refs/heads/master
Commit: ac9e2990703ab0d235548851b327c985f453cbcc
Parents: 3845c27
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Mon Apr 9 15:16:49 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:02 2018 +0200

----------------------------------------------------------------------
 .../crail/storage/nvmf/NvmfStorageClient.java   | 43 ++++++++++++++++++--
 .../nvmf/client/NvmfStorageEndpoint.java        |  4 ++
 2 files changed, 43 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/ac9e2990/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
index d9dd976..cccf596 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
@@ -34,18 +34,44 @@ import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 
 public class NvmfStorageClient implements StorageClient {
 	private static final Logger LOG = CrailUtils.getLogger();
 	private static Nvme nvme;
 	private boolean initialized;
-	private List<StorageEndpoint> endpoints;
+	private volatile boolean closing;
+	private final Thread keepAliveThread;
+	private List<NvmfStorageEndpoint> endpoints;
 	private CrailStatistics statistics;
 	private CrailBufferCache bufferCache;
 
 	public NvmfStorageClient() {
 		this.initialized = false;
 		this.endpoints = new CopyOnWriteArrayList<>();
+		this.closing = false;
+		this.keepAliveThread = new Thread(() -> {
+			while (!closing) {
+				for (NvmfStorageEndpoint endpoint : endpoints) {
+					try {
+						endpoint.keepAlive();
+					} catch (IOException e) {
+						e.printStackTrace();
+						return;
+					}
+				}
+				/* We use the default keep alive timer of 120s in jNVMf */
+				try {
+					Thread.sleep(TimeUnit.MILLISECONDS.convert(110, TimeUnit.SECONDS));
+				} catch (InterruptedException e) {
+					return;
+				}
+			}
+		});
+	}
+
+	boolean isValid() {
+		return keepAliveThread.isAlive();
 	}
 
 	public void init(CrailStatistics statistics, CrailBufferCache bufferCache, CrailConfiguration crailConfiguration,
@@ -58,6 +84,7 @@ public class NvmfStorageClient implements StorageClient {
 		this.bufferCache = bufferCache;
 		LOG.info("Initialize Nvmf storage client");
 		NvmfStorageConstants.parseCmdLine(crailConfiguration, args);
+		keepAliveThread.start();
 	}
 
 	public void printConf(Logger logger) {
@@ -72,14 +99,22 @@ public class NvmfStorageClient implements StorageClient {
 	}
 
 	public synchronized StorageEndpoint createEndpoint(DataNodeInfo info) throws IOException {
-		StorageEndpoint endpoint = new NvmfStorageEndpoint(getEndpointGroup(), info, statistics, bufferCache);
+		if (!isValid()) {
+			throw new IOException("Storage client state not valid");
+		}
+		NvmfStorageEndpoint endpoint = new NvmfStorageEndpoint(getEndpointGroup(), info, statistics, bufferCache);
 		endpoints.add(endpoint);
 		return endpoint;
 	}
 
 	public void close() throws Exception {
-		for (StorageEndpoint endpoint : endpoints) {
-			endpoint.close();
+		if (!closing) {
+			closing = true;
+			keepAliveThread.interrupt();
+			keepAliveThread.join();
+			for (StorageEndpoint endpoint : endpoints) {
+				endpoint.close();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/ac9e2990/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
index 1661349..c9b17de 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
@@ -121,6 +121,10 @@ public class NvmfStorageEndpoint implements StorageEndpoint {
 		this.statistics = statistics;
 	}
 
+	public void keepAlive() throws IOException {
+		controller.keepAlive();
+	}
+
 	public int getLBADataSize() {
 		return lbaDataSize;
 	}


[16/16] incubator-crail git commit: NvmfStorageClient: number of LBAs

Posted by pe...@apache.org.
NvmfStorageClient: number of LBAs

Number of LBAs in new jNVMf API is no longer 0-based.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/fb01cdf0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/fb01cdf0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/fb01cdf0

Branch: refs/heads/master
Commit: fb01cdf06649ced6a4df45f679d4278f664cdba0
Parents: db75499
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Mon Apr 9 15:24:41 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:03 2018 +0200

----------------------------------------------------------------------
 .../org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/fb01cdf0/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
index e17a013..655a304 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
@@ -204,7 +204,8 @@ public class NvmfStorageEndpoint implements StorageEndpoint {
 		long startingLBA = startingAddress / getLBADataSize();
 		sqe.setStartingLba(startingLBA);
 		/* TODO: on read this potentially overwrites data beyond the set limit */
-		short numLogicalBlocks = (short)(getNumLogicalBlocks(buffer) - 1);
+		short numLogicalBlocks = (short)getNumLogicalBlocks(buffer);
+		buffer.limit(buffer.position() + numLogicalBlocks * getLBADataSize());
 		sqe.setNumberOfLogicalBlocks(numLogicalBlocks);
 		int remoteKey = registeredBufferCache.getRemoteKey(buffer);
 		KeyedSglDataBlockDescriptor dataBlockDescriptor = sqe.getKeyedSglDataBlockDescriptor();


[14/16] incubator-crail git commit: NvmfStorageConstants: Allocation size integer

Posted by pe...@apache.org.
NvmfStorageConstants: Allocation size integer

Change allocationsize constant to be an integer. Since we do not
support allocation sizes > MAXINT.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/34e0af4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/34e0af4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/34e0af4e

Branch: refs/heads/master
Commit: 34e0af4ee0a1c246d91d67e2e0e1547f9ad43a70
Parents: a9162a2
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Tue Apr 10 10:56:37 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:03 2018 +0200

----------------------------------------------------------------------
 .../org/apache/crail/storage/nvmf/NvmfStorageConstants.java   | 7 ++++---
 .../java/org/apache/crail/storage/nvmf/NvmfStorageServer.java | 2 +-
 2 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/34e0af4e/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
index 78b940b..56946c8 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
@@ -48,7 +48,7 @@ public class NvmfStorageConstants {
 	public static NamespaceIdentifier NAMESPACE = new NamespaceIdentifier(1);
 
 	public static final String ALLOCATION_SIZE_KEY = "allocationsize";
-	public static long ALLOCATION_SIZE = 1073741824; /* 1GB */
+	public static int ALLOCATION_SIZE = 1073741824; /* 1GB */
 
 	public static final String QUEUE_SIZE_KEY = "queueSize";
 	public static int QUEUE_SIZE = 64;
@@ -85,7 +85,7 @@ public class NvmfStorageConstants {
 
 		arg = get(conf, ALLOCATION_SIZE_KEY);
 		if (arg != null) {
-			ALLOCATION_SIZE = Long.parseLong(arg);
+			ALLOCATION_SIZE = Integer.parseInt(arg);
 		}
 
 		arg = get(conf, QUEUE_SIZE_KEY);
@@ -101,7 +101,8 @@ public class NvmfStorageConstants {
 
 	public static void verify() {
 		if (ALLOCATION_SIZE % CrailConstants.BLOCK_SIZE != 0){
-			throw new IllegalArgumentException("allocationsize must be multiple of crail.blocksize");
+			throw new IllegalArgumentException(fullKey(ALLOCATION_SIZE_KEY) + " (" + ALLOCATION_SIZE +
+					") must be multiple of crail.blocksize (" + CrailConstants.BLOCK_SIZE + ")");
 		}
 		if (QUEUE_SIZE < 0) {
 			throw new IllegalArgumentException("Queue size negative");

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/34e0af4e/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
index d41c406..f632852 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
@@ -105,7 +105,7 @@ public class NvmfStorageServer implements StorageServer {
 			LOG.info("new block, length " + NvmfStorageConstants.ALLOCATION_SIZE);
 			LOG.debug("block stag 0, address " + address + ", length " + NvmfStorageConstants.ALLOCATION_SIZE);
 			alignedSize -= NvmfStorageConstants.ALLOCATION_SIZE;
-			resource = StorageResource.createResource(address, (int)NvmfStorageConstants.ALLOCATION_SIZE, 0);
+			resource = StorageResource.createResource(address, NvmfStorageConstants.ALLOCATION_SIZE, 0);
 			address += NvmfStorageConstants.ALLOCATION_SIZE;
 		}
 


[02/16] incubator-crail git commit: NvmfFuture: fix concurrency bug

Posted by pe...@apache.org.
NvmfFuture: fix concurrency bug

Fix bug when completed counter was incremented in parallel
and never reached 2.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/feb765c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/feb765c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/feb765c3

Branch: refs/heads/master
Commit: feb765c3143d35f3526909b1b69aa33790b21a9d
Parents: ac9e299
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Mon Apr 9 15:18:40 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:02 2018 +0200

----------------------------------------------------------------------
 .../org/apache/crail/storage/nvmf/client/NvmfFuture.java | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/feb765c3/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
index 639f568..885121d 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
@@ -28,16 +28,17 @@ import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class NvmfFuture<Command extends NvmIoCommand<? extends NvmIoCommandCapsule>> implements StorageFuture, OperationCallback {
 	private final NvmfStorageEndpoint endpoint;
 	private final Command command;
 	private final Queue<Command> operations;
-	private boolean done;
+	private volatile boolean done;
 	private RdmaException exception;
 	private final StorageResult storageResult;
 	private final Response<NvmResponseCapsule> response;
-	private int completed;
+	private final AtomicInteger completed;
 
 	NvmfFuture(NvmfStorageEndpoint endpoint, Command command, Response<NvmResponseCapsule> response,
 			   Queue<Command> operations, int length) {
@@ -47,7 +48,7 @@ public class NvmfFuture<Command extends NvmIoCommand<? extends NvmIoCommandCapsu
 		this.done = false;
 		this.storageResult = () -> length;
 		this.response = response;
-		this.completed = 0;
+		this.completed = new AtomicInteger(0);
 	}
 
 	@Override
@@ -129,8 +130,8 @@ public class NvmfFuture<Command extends NvmIoCommand<? extends NvmIoCommandCapsu
 	@Override
 	public void onComplete() {
 		assert !done;
-		assert completed < 2;
-		if (++completed == 2) {
+		assert completed.get() < 2;
+		if (completed.incrementAndGet() == 2) {
 			/* we need to complete command and response */
 			operations.add(command);
 			this.done = true;


[08/16] incubator-crail git commit: NvmfStorageClient: getEndpointGroup private

Posted by pe...@apache.org.
NvmfStorageClient: getEndpointGroup private

Make getEndpointGroup private, it should not be used outside of
this class and does not handle concurrent execution.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/4caf5a0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/4caf5a0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/4caf5a0e

Branch: refs/heads/master
Commit: 4caf5a0e2a176fbfd36b5d66bf5935f3366f629b
Parents: 6eb895c
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Tue Apr 10 10:48:23 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:03 2018 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/4caf5a0e/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
index e352034..1cec887 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
@@ -89,7 +89,7 @@ public class NvmfStorageClient implements StorageClient {
 		NvmfStorageConstants.printConf(logger);
 	}
 
-	public static Nvme getEndpointGroup() throws UnknownHostException {
+	private static Nvme getEndpointGroup() throws UnknownHostException {
 		if (nvme == null) {
 			nvme = new Nvme();
 		}


[13/16] incubator-crail git commit: NvmfStagingBufferCache: enhance excpetion message

Posted by pe...@apache.org.
NvmfStagingBufferCache: enhance excpetion message

Add values of illegal arguments to exception message to make
it easier for the user to debug.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/176bc39c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/176bc39c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/176bc39c

Branch: refs/heads/master
Commit: 176bc39cf59a30b85e69512c89fc7c5004e137a8
Parents: 34e0af4
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Tue Apr 10 11:00:06 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:03 2018 +0200

----------------------------------------------------------------------
 .../crail/storage/nvmf/client/NvmfStagingBufferCache.java     | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/176bc39c/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
index dcfc411..fdc0a69 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
@@ -43,10 +43,10 @@ public class NvmfStagingBufferCache {
 
 	NvmfStagingBufferCache(CrailBufferCache bufferCache, int maxEntries, int lbaDataSize) {
 		if (maxEntries <= 0) {
-			throw new IllegalArgumentException("maximum entries <= 0");
+			throw new IllegalArgumentException("maximum entries (" + maxEntries + ") <= 0");
 		}
 		if (lbaDataSize <= 0) {
-			throw new IllegalArgumentException("LBA data size <= 0");
+			throw new IllegalArgumentException("LBA data size (" + lbaDataSize + ") <= 0");
 		}
 		this.remoteAddressMap = new ConcurrentHashMap<>(maxEntries);
 		this.freeBuffers = new ArrayBlockingQueue<>(maxEntries);
@@ -78,7 +78,8 @@ public class NvmfStagingBufferCache {
 			throw new OutOfMemoryError();
 		}
 		if (buffer.capacity() < lbaDataSize) {
-			throw new IllegalArgumentException("Slice size smaller LBA data size");
+			throw new IllegalArgumentException("Slice size (" + buffer.capacity() + ") smaller LBA data size (" +
+					lbaDataSize + ")");
 		}
 		int numStagingBuffers = buffer.remaining() / lbaDataSize;
 		while (numStagingBuffers-- > 0 && buffersLeft > 0) {


[04/16] incubator-crail git commit: NvmfStorageTier: fix to work with new jNVMf API

Posted by pe...@apache.org.
NvmfStorageTier: fix to work with new jNVMf API

Minor changes to work with latest jNVMf version.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/b54e114b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/b54e114b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/b54e114b

Branch: refs/heads/master
Commit: b54e114bb180f28ba428d30b11f66f4b7e9919e4
Parents: 29be91d
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Wed Apr 4 16:32:01 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:02 2018 +0200

----------------------------------------------------------------------
 .../apache/crail/storage/nvmf/NvmfStorageServer.java  |  4 ++--
 .../apache/crail/storage/nvmf/client/NvmfFuture.java  |  2 +-
 .../storage/nvmf/client/NvmfStorageEndpoint.java      | 14 +++++++-------
 3 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/b54e114b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
index f8b0d8c..b15cdc9 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
@@ -70,8 +70,8 @@ public class NvmfStorageServer implements StorageServer {
 					" at controller " + transportId.toString());
 		}
 		IdentifyNamespaceData namespaceData = namespace.getIdentifyNamespaceData();
-		LBAFormat lbaFormat = namespaceData.getFormattedLBASize();
-		int dataSize = lbaFormat.getLBADataSize().toInt();
+		LbaFormat lbaFormat = namespaceData.getFormattedLbaSize();
+		int dataSize = lbaFormat.getLbaDataSize().toInt();
 		long namespaceSize = dataSize * namespaceData.getNamespaceCapacity();
 		alignedSize = namespaceSize - (namespaceSize % NvmfStorageConstants.ALLOCATION_SIZE);
 		address = 0;

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/b54e114b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
index 0a6c9b4..639f568 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-public class NvmfFuture<Command extends NvmIOCommand<? extends NvmIOCommandCapsule>> implements StorageFuture, OperationCallback {
+public class NvmfFuture<Command extends NvmIoCommand<? extends NvmIoCommandCapsule>> implements StorageFuture, OperationCallback {
 	private final NvmfStorageEndpoint endpoint;
 	private final Command command;
 	private final Queue<Command> operations;

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/b54e114b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
index e1430af..1661349 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
@@ -45,7 +45,7 @@ public class NvmfStorageEndpoint implements StorageEndpoint {
 	private static final Logger LOG = CrailUtils.getLogger();
 
 	private final Controller controller;
-	private final IOQueuePair queuePair;
+	private final IoQueuePair queuePair;
 	private final int lbaDataSize;
 	private final long namespaceCapacity;
 	private final NvmfRegisteredBufferCache registeredBufferCache;
@@ -93,13 +93,13 @@ public class NvmfStorageEndpoint implements StorageEndpoint {
 					" at controller " + transportId.toString());
 		}
 		IdentifyNamespaceData identifyNamespaceData = namespace.getIdentifyNamespaceData();
-		lbaDataSize = identifyNamespaceData.getFormattedLBASize().getLBADataSize().toInt();
+		lbaDataSize = identifyNamespaceData.getFormattedLbaSize().getLbaDataSize().toInt();
 		if (CrailConstants.SLICE_SIZE % lbaDataSize != 0) {
 			throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY +
 					" is not a multiple of LBA data size (" + lbaDataSize + ")");
 		}
 		namespaceCapacity = identifyNamespaceData.getNamespaceCapacity() * lbaDataSize;
-		this.queuePair = controller.createIOQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0,
+		this.queuePair = controller.createIoQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0,
 				SubmissionQueueEntry.SIZE);
 
 		this.writeCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
@@ -179,7 +179,7 @@ public class NvmfStorageEndpoint implements StorageEndpoint {
 			} while (!tryGetOperation());
 		}
 
-		NvmIOCommand<? extends NvmIOCommandCapsule> command;
+		NvmIoCommand<? extends NvmIoCommandCapsule> command;
 		NvmfFuture<?> future;
 		Response<NvmResponseCapsule> response;
 		if (op == Operation.READ) {
@@ -196,16 +196,16 @@ public class NvmfStorageEndpoint implements StorageEndpoint {
 		command.setCallback(future);
 		response.setCallback(future);
 
-		NvmIOCommandSQE sqe = command.getCommandCapsule().getSubmissionQueueEntry();
+		NvmIoCommandSqe sqe = command.getCommandCapsule().getSubmissionQueueEntry();
 		long startingLBA = startingAddress / getLBADataSize();
-		sqe.setStartingLBA(startingLBA);
+		sqe.setStartingLba(startingLBA);
 		/* TODO: on read this potentially overwrites data beyond the set limit */
 		short numLogicalBlocks = (short)(getNumLogicalBlocks(buffer) - 1);
 		sqe.setNumberOfLogicalBlocks(numLogicalBlocks);
 		KeyedNativeBuffer registeredBuffer = registeredBufferCache.get(buffer);
 		registeredBuffer.position(buffer.position());
 		registeredBuffer.limit(registeredBuffer.position() + (numLogicalBlocks + 1) * getLBADataSize());
-		command.getCommandCapsule().setSGLDescriptor(registeredBuffer);
+		command.getCommandCapsule().setSglDescriptor(registeredBuffer);
 
 		command.execute(response);
 


[03/16] incubator-crail git commit: NvmfStagingBufferCache: fix allocation

Posted by pe...@apache.org.
NvmfStagingBufferCache: fix allocation

Fix allocation of Crail buffers to slice correct amount
of buffers for staging.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/3845c273
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/3845c273
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/3845c273

Branch: refs/heads/master
Commit: 3845c273be15ef7405132d71dad8e3faee9f5590
Parents: b54e114
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Mon Apr 9 15:13:57 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:02 2018 +0200

----------------------------------------------------------------------
 .../apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/3845c273/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
index b4f4dc3..dcfc411 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
@@ -80,7 +80,8 @@ public class NvmfStagingBufferCache {
 		if (buffer.capacity() < lbaDataSize) {
 			throw new IllegalArgumentException("Slice size smaller LBA data size");
 		}
-		while (buffer.remaining() >= lbaDataSize && buffersLeft > 0) {
+		int numStagingBuffers = buffer.remaining() / lbaDataSize;
+		while (numStagingBuffers-- > 0 && buffersLeft > 0) {
 			buffer.limit(buffer.position() + lbaDataSize);
 			freeBuffers.add(buffer.slice());
 			buffer.position(buffer.limit());


[10/16] incubator-crail git commit: NvmfUnalignedWriteFuture: refactoring

Posted by pe...@apache.org.
NvmfUnalignedWriteFuture: refactoring

Use offsetInSector in floorToSector and provide a new
floorToSector overload method for ints.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/89820d9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/89820d9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/89820d9a

Branch: refs/heads/master
Commit: 89820d9a379f40890f27f0f0054990cb955f1e13
Parents: 4a303c8
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Tue Apr 10 11:08:50 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:03 2018 +0200

----------------------------------------------------------------------
 .../crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java  | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/89820d9a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java
index 08eab5b..d4b16b2 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java
@@ -43,7 +43,11 @@ public class NvmfUnalignedWriteFuture implements StorageFuture {
 	}
 
 	private final long floorToSectorSize(long address) {
-		return address - (address % endpoint.getLBADataSize());
+		return address - offsetInSector(address);
+	}
+
+	private final int floorToSectorSize(int length) {
+		return length - offsetInSector(length);
 	}
 
 	private final int leftInSector(long address) {
@@ -91,7 +95,7 @@ public class NvmfUnalignedWriteFuture implements StorageFuture {
 		/* middle */
 		if (isSectorAligned(nextRemoteOffset) && buffer.remaining() >= endpoint.getLBADataSize()) {
 			int oldLimit = buffer.limit();
-			buffer.limit(buffer.position() + (int)floorToSectorSize(buffer.remaining()));
+			buffer.limit(buffer.position() + floorToSectorSize(buffer.remaining()));
 			int toWrite = buffer.remaining();
 			middleFuture = endpoint.write(buffer, blockInfo, nextRemoteOffset);
 			nextRemoteOffset += toWrite;


[11/16] incubator-crail git commit: NvmfStorageClient: isValid -> isAlive

Posted by pe...@apache.org.
NvmfStorageClient: isValid -> isAlive

Change method name from isValid to isAlive. isValid can be
misinterpreted by the users that he misconfigured the system.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/a9162a28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/a9162a28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/a9162a28

Branch: refs/heads/master
Commit: a9162a28a6e189a0c56f4916d0f3f63dec59dfdf
Parents: 4caf5a0
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Tue Apr 10 10:51:08 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:03 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/crail/storage/nvmf/NvmfStorageClient.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/a9162a28/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
index 1cec887..09a8048 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
@@ -68,7 +68,7 @@ public class NvmfStorageClient implements StorageClient {
 		});
 	}
 
-	boolean isValid() {
+	boolean isAlive() {
 		return keepAliveThread.isAlive();
 	}
 
@@ -97,8 +97,8 @@ public class NvmfStorageClient implements StorageClient {
 	}
 
 	public synchronized StorageEndpoint createEndpoint(DataNodeInfo info) throws IOException {
-		if (!isValid()) {
-			throw new IOException("Storage client state not valid");
+		if (!isAlive()) {
+			throw new IOException("Storage client is not alive");
 		}
 		NvmfStorageEndpoint endpoint = new NvmfStorageEndpoint(getEndpointGroup(), info, statistics, bufferCache);
 		endpoints.add(endpoint);


[07/16] incubator-crail git commit: NvmfStorageEndpoint: too many outstanding requests

Posted by pe...@apache.org.
NvmfStorageEndpoint: too many outstanding requests

Fix bug were there could be too many outstanding requests posted
on the NVMf SQ.

Signed-off-by: Jonas Pfefferle <pe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/50cf2683
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/50cf2683
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/50cf2683

Branch: refs/heads/master
Commit: 50cf26837db41534c39ea75bd7a361e22d7648b8
Parents: feb765c
Author: Jonas Pfefferle <pe...@apache.org>
Authored: Mon Apr 9 15:20:38 2018 +0200
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Tue Apr 10 13:19:02 2018 +0200

----------------------------------------------------------------------
 .../org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/50cf2683/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
index c9b17de..8188d82 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
@@ -144,7 +144,7 @@ public class NvmfStorageEndpoint implements StorageEndpoint {
 
 	private boolean tryGetOperation() {
 		int outstandingOperationsOld = outstandingOperations.get();
-		if (outstandingOperationsOld != NvmfStorageConstants.QUEUE_SIZE) {
+		if (outstandingOperationsOld < NvmfStorageConstants.QUEUE_SIZE) {
 			return outstandingOperations.compareAndSet(outstandingOperationsOld, outstandingOperationsOld + 1);
 		}
 		return false;