You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@crail.apache.org by PepperJo <gi...@git.apache.org> on 2018/04/09 13:27:16 UTC

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

GitHub user PepperJo opened a pull request:

    https://github.com/apache/incubator-crail/pull/16

    New NVMf storage based on jNVMf library

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/PepperJo/incubator-crail nvmf_fixes

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-crail/pull/16.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16
    
----
commit 0d7b344f7b682b12effbc2454cc0a1d15c499cbf
Author: Jonas Pfefferle <pe...@...>
Date:   2018-03-01T16:29:34Z

    New NVMf storage tier: use jNVMf library
    
    New NVMf storage tier implementation which uses jNVMf library instead
    of SPDK. We do not implement unaligned reads anymore since we believe
    the semantics of the underlying storage system should not be hidden like
    this. We guarantee good performance when using buffered streams since
    they now try to align accesses whenever possible.
    
    Signed-off-by: Jonas Pfefferle <pe...@apache.org>

commit 9d846a9e88897a689d5efb0965f54d261580e6f9
Author: Jonas Pfefferle <pe...@...>
Date:   2018-04-04T14:32:01Z

    NvmfStorageTier: fix to work with new jNVMf API
    
    Minor changes to work with latest jNVMf version.
    
    Signed-off-by: Jonas Pfefferle <pe...@apache.org>

commit 6609e3a1ac8f8200c41d019e29e1cc35e130dc63
Author: Jonas Pfefferle <pe...@...>
Date:   2018-04-09T13:13:57Z

    NvmfStagingBufferCache: fix allocation
    
    Fix allocation of Crail buffers to slice correct amount
    of buffers for staging.
    
    Signed-off-by: Jonas Pfefferle <pe...@apache.org>

commit b0587e9ab1d986dbf13bfbf96580674081942c9f
Author: Jonas Pfefferle <pe...@...>
Date:   2018-04-09T13:16:49Z

    NvmfStorageClient: keep alive thread
    
    Start keep alive thread which sends keep alive message to
    each controller every 110s (timeout 120s). Otherwise thread sleeps.
    
    Signed-off-by: Jonas Pfefferle <pe...@apache.org>

commit 992add391fc33b8ed53eac8b80ad1b69bf2a3c98
Author: Jonas Pfefferle <pe...@...>
Date:   2018-04-09T13:18:40Z

    NvmfFuture: fix concurrency bug
    
    Fix bug when completed counter was incremented in parallel
    and never reached 2.
    
    Signed-off-by: Jonas Pfefferle <pe...@apache.org>

commit f18c0d5479cf4a69e357a7ed2261725c8cdacc43
Author: Jonas Pfefferle <pe...@...>
Date:   2018-04-09T13:20:38Z

    NvmfStorageEndpoint: too many outstanding requests
    
    Fix bug were there could be too many outstanding requests posted
    on the NVMf SQ.
    
    Signed-off-by: Jonas Pfefferle <pe...@apache.org>

commit 01ed9e18d049c8cf974872ec1876f58d9ce8cea1
Author: Jonas Pfefferle <pe...@...>
Date:   2018-04-09T13:22:41Z

    NvmfRegisteredBufferCache: simplify
    
    Simplify buffer cache by do not hold references to CrailBuffers.
    We only return the corresponding key now instead of a wrapped
    KeyedNativeBuffer.
    
    Signed-off-by: Jonas Pfefferle <pe...@apache.org>

commit 6dd85d185cbfaa6ec52f1c5b85b8862fba533748
Author: Jonas Pfefferle <pe...@...>
Date:   2018-04-09T13:24:41Z

    NvmfStorageClient: number of LBAs
    
    Number of LBAs in new jNVMf API is no longer 0-based.
    
    Signed-off-by: Jonas Pfefferle <pe...@apache.org>

----


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180319133
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Copyright (C) 2018, IBM Corporation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.crail.storage.nvmf.client;
    +
    +import org.apache.crail.CrailBuffer;
    +import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.storage.StorageFuture;
    +import org.apache.crail.storage.StorageResult;
    +
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +
    +public class NvmfUnalignedWriteFuture implements StorageFuture {
    +	private final NvmfStorageEndpoint endpoint;
    +	private StorageFuture beginFuture;
    +	private StorageFuture middleFuture;
    +	private StorageFuture endFuture;
    +	private final int written;
    +	private NvmfStagingBufferCache.BufferCacheEntry beginBuffer;
    +	private NvmfStagingBufferCache.BufferCacheEntry endBuffer;
    +
    +	private final boolean isSectorAligned(long address) {
    +		return address % endpoint.getLBADataSize() == 0;
    +	}
    +
    +	private final long floorToSectorSize(long address) {
    +		return address - (address % endpoint.getLBADataSize());
    +	}
    +
    +	private final int leftInSector(long address) {
    +		return endpoint.getLBADataSize() - offsetInSector(address);
    +	}
    +
    +	private final int offsetInSector(long address) {
    +		return (int)(address % endpoint.getLBADataSize());
    +	}
    +
    +	NvmfUnalignedWriteFuture(NvmfStorageEndpoint endpoint, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws Exception {
    +		this.endpoint = endpoint;
    +		this.written = buffer.remaining();
    +		/* assume blockInfo.getAddr() is sector aligned */
    +		assert isSectorAligned(blockInfo.getAddr());
    +
    +		long nextRemoteOffset = remoteOffset;
    +		/* beginning */
    +		if (!isSectorAligned(remoteOffset)) {
    +			int copySize = Math.min(leftInSector(remoteOffset), buffer.remaining());
    +			nextRemoteOffset = remoteOffset + copySize;
    +			int oldLimit = buffer.limit();
    +			buffer.limit(buffer.position() + copySize);
    +			long alignedRemoteOffset = floorToSectorSize(remoteOffset);
    +			long alignedRemoteAddress = blockInfo.getAddr() + alignedRemoteOffset;
    +			beginBuffer = endpoint.getStagingBufferCache().getExisting(alignedRemoteAddress);
    +			if (beginBuffer == null) {
    +				/* we had to delete the old buffer because we ran out of space. This should happen rarely. */
    +				beginBuffer = endpoint.getStagingBufferCache().get(alignedRemoteAddress);
    +				endpoint.read(beginBuffer.getBuffer(), blockInfo, alignedRemoteOffset).get();
    +			} else {
    +				/* Wait for previous end operation to finish */
    +				beginBuffer.getFuture().get();
    +			}
    +			CrailBuffer stagingBuffer = beginBuffer.getBuffer();
    +			stagingBuffer.position(offsetInSector(remoteOffset));
    +			stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
    +			buffer.limit(oldLimit);
    +			stagingBuffer.position(0);
    +			beginFuture = endpoint.write(stagingBuffer, blockInfo, alignedRemoteOffset);
    +			beginBuffer.setFuture(beginFuture);
    +			stagingBuffer.position(offsetInSector(remoteOffset));
    +		}
    +
    +		/* middle */
    +		if (isSectorAligned(nextRemoteOffset) && buffer.remaining() >= endpoint.getLBADataSize()) {
    +			int oldLimit = buffer.limit();
    +			buffer.limit(buffer.position() + (int)floorToSectorSize(buffer.remaining()));
    +			int toWrite = buffer.remaining();
    +			middleFuture = endpoint.write(buffer, blockInfo, nextRemoteOffset);
    +			nextRemoteOffset += toWrite;
    +			buffer.position(buffer.limit());
    +			buffer.limit(oldLimit);
    +		}
    +
    +		/* end */
    +		if (buffer.remaining() > 0) {
    +			endBuffer = endpoint.getStagingBufferCache().get(blockInfo.getAddr() + nextRemoteOffset);
    +			CrailBuffer stagingBuffer = endBuffer.getBuffer();
    +			stagingBuffer.position(0);
    +			stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
    +			stagingBuffer.position(0);
    +			endFuture = endpoint.write(stagingBuffer, blockInfo, nextRemoteOffset);
    +			endBuffer.setFuture(endFuture);
    +		}
    +	}
    +
    +	@Override
    +	public boolean isSynchronous() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean cancel(boolean b) {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isCancelled() {
    +		return false;
    +	}
    +
    +	private static boolean checkIfFutureIsDone(StorageFuture future) {
    --- End diff --
    
    Will there be cases when future == null implies done? 


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180331213
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java ---
    @@ -89,31 +76,31 @@ public static void updateConstants(CrailConfiguration conf) throws UnknownHostEx
     
     		arg = get(conf, NQN_KEY);
     		if (arg != null) {
    -			NQN = arg;
    +			NQN = new NvmeQualifiedName(arg);
     		}
     
     		arg = get(conf, ALLOCATION_SIZE_KEY);
     		if (arg != null) {
     			ALLOCATION_SIZE = Long.parseLong(arg);
     		}
     
    -		arg = get(conf, SERVER_MEMPOOL_KEY);
    +		arg = get(conf, QUEUE_SIZE_KEY);
     		if (arg != null) {
    -			SERVER_MEMPOOL = Long.parseLong(arg);
    +			QUEUE_SIZE = Integer.parseInt(arg);
     		}
     
    -		arg = get(conf, CLIENT_MEMPOOL_KEY);
    +		arg = get(conf, STAGING_CACHE_SIZE_KEY);
     		if (arg != null) {
    -			CLIENT_MEMPOOL = Long.parseLong(arg);
    +			STAGING_CACHE_SIZE = Integer.parseInt(arg);
     		}
     	}
     
    -	public static void verify() throws IOException {
    -		if (NAMESPACE <= 0){
    -			throw new IOException("Namespace must be > 0");
    -		}
    +	public static void verify() {
     		if (ALLOCATION_SIZE % CrailConstants.BLOCK_SIZE != 0){
    -			throw new IOException("allocationsize must be multiple of crail.blocksize");
    +			throw new IllegalArgumentException("allocationsize must be multiple of crail.blocksize");
    --- End diff --
    
    Will do.


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180318375
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Copyright (C) 2018, IBM Corporation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.crail.storage.nvmf.client;
    +
    +import org.apache.crail.CrailBuffer;
    +import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.storage.StorageFuture;
    +import org.apache.crail.storage.StorageResult;
    +
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +
    +public class NvmfUnalignedWriteFuture implements StorageFuture {
    +	private final NvmfStorageEndpoint endpoint;
    +	private StorageFuture beginFuture;
    +	private StorageFuture middleFuture;
    +	private StorageFuture endFuture;
    +	private final int written;
    +	private NvmfStagingBufferCache.BufferCacheEntry beginBuffer;
    +	private NvmfStagingBufferCache.BufferCacheEntry endBuffer;
    +
    +	private final boolean isSectorAligned(long address) {
    +		return address % endpoint.getLBADataSize() == 0;
    +	}
    +
    +	private final long floorToSectorSize(long address) {
    +		return address - (address % endpoint.getLBADataSize());
    --- End diff --
    
    use your own function? (offsetInSector) "return address - offsetInSector; "


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180316603
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java ---
    @@ -19,208 +19,229 @@
     
     package org.apache.crail.storage.nvmf.client;
     
    -import com.ibm.disni.nvmef.NvmeCommand;
    -import com.ibm.disni.nvmef.NvmeEndpoint;
    -import com.ibm.disni.nvmef.NvmeEndpointGroup;
    -import com.ibm.disni.nvmef.spdk.IOCompletion;
    -
    +import com.ibm.jnvmf.*;
     import org.apache.crail.CrailBuffer;
    +import org.apache.crail.CrailBufferCache;
    +import org.apache.crail.CrailStatistics;
     import org.apache.crail.conf.CrailConstants;
    -import org.apache.crail.memory.BufferCache;
     import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.metadata.DataNodeInfo;
     import org.apache.crail.storage.StorageEndpoint;
     import org.apache.crail.storage.StorageFuture;
    -import org.apache.crail.storage.nvmf.NvmfBufferCache;
     import org.apache.crail.storage.nvmf.NvmfStorageConstants;
     import org.apache.crail.utils.CrailUtils;
     import org.slf4j.Logger;
     
     import java.io.IOException;
    +import java.net.InetAddress;
     import java.net.InetSocketAddress;
    -import java.net.URI;
    -import java.net.URISyntaxException;
    -import java.util.concurrent.*;
    +import java.util.List;
    +import java.util.Queue;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.TimeoutException;
    +import java.util.concurrent.atomic.AtomicInteger;
     
     public class NvmfStorageEndpoint implements StorageEndpoint {
     	private static final Logger LOG = CrailUtils.getLogger();
     
    -	private final InetSocketAddress inetSocketAddress;
    -	private final NvmeEndpoint endpoint;
    -	private final int sectorSize;
    -	private final BufferCache cache;
    -	private final BlockingQueue<NvmeCommand> freeCommands;
    -	private final NvmeCommand[] commands;
    -	private final NvmfStorageFuture[] futures;
    -	private final ThreadLocal<long[]> completed;
    -	private final int ioQeueueSize;
    -
    -	public NvmfStorageEndpoint(NvmeEndpointGroup group, InetSocketAddress inetSocketAddress) throws IOException {
    -		this.inetSocketAddress = inetSocketAddress;
    -		endpoint = group.createEndpoint();
    +	private final Controller controller;
    +	private final IoQueuePair queuePair;
    +	private final int lbaDataSize;
    +	private final long namespaceCapacity;
    +	private final NvmfRegisteredBufferCache registeredBufferCache;
    +	private final NvmfStagingBufferCache stagingBufferCache;
    +	private final CrailStatistics statistics;
    +
    +	private final Queue<NvmWriteCommand> writeCommands;
    +	private final Queue<NvmReadCommand> readCommands;
    +
    +	private final AtomicInteger outstandingOperations;
    +
    +	public NvmfStorageEndpoint(Nvme nvme, DataNodeInfo info, CrailStatistics statistics,
    +							   CrailBufferCache bufferCache) throws IOException {
    +		InetSocketAddress inetSocketAddress = new InetSocketAddress(
    +				InetAddress.getByAddress(info.getIpAddress()), info.getPort());
    +		// XXX FIXME: nsid from datanodeinfo
    +		NvmfTransportId transportId = new NvmfTransportId(inetSocketAddress,
    +				new NvmeQualifiedName(NvmfStorageConstants.NQN.toString() + info.getPort()));
    +		LOG.info("Connecting to NVMf target at " + transportId.toString());
    +		controller = nvme.connect(transportId);
    +		controller.getControllerConfiguration().setEnable(true);
    +		controller.syncConfiguration();
     		try {
    -			URI url = new URI("nvmef://" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() +
    -					"/0/" + NvmfStorageConstants.NAMESPACE + "?subsystem=nqn.2016-06.io.spdk:cnode1");
    -			LOG.info("Connecting to " + url.toString());
    -			endpoint.connect(url);
    -		} catch (URISyntaxException e) {
    -			//FIXME
    -			e.printStackTrace();
    +			controller.waitUntilReady();
    +		} catch (TimeoutException e) {
    +			throw new IOException(e);
     		}
    -		sectorSize = endpoint.getSectorSize();
    -		cache = new NvmfBufferCache();
    -		ioQeueueSize = endpoint.getIOQueueSize();
    -		freeCommands = new ArrayBlockingQueue<NvmeCommand>(ioQeueueSize);
    -		commands = new NvmeCommand[ioQeueueSize];
    -		for (int i = 0; i < ioQeueueSize; i++) {
    -			NvmeCommand command = endpoint.newCommand();
    -			command.setId(i);
    -			commands[i] = command;
    -			freeCommands.add(command);
    +		IdentifyControllerData identifyControllerData = controller.getIdentifyControllerData();
    +		if (CrailConstants.SLICE_SIZE > identifyControllerData.getMaximumDataTransferSize().toInt()) {
    +			throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + " > max transfer size (" +
    --- End diff --
    
    :+1: 


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180312940
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java ---
    @@ -31,41 +30,92 @@
     import org.apache.crail.utils.CrailUtils;
     import org.slf4j.Logger;
     
    -import com.ibm.disni.nvmef.NvmeEndpointGroup;
    -import com.ibm.disni.nvmef.spdk.NvmeTransportType;
    +import java.io.IOException;
    +import java.net.UnknownHostException;
    +import java.util.List;
    +import java.util.concurrent.CopyOnWriteArrayList;
    +import java.util.concurrent.TimeUnit;
     
     public class NvmfStorageClient implements StorageClient {
     	private static final Logger LOG = CrailUtils.getLogger();
    -	private static NvmeEndpointGroup clientGroup;
    -	private boolean initialized = false;
    +	private static Nvme nvme;
    +	private boolean initialized;
    +	private volatile boolean closing;
    +	private final Thread keepAliveThread;
    +	private List<NvmfStorageEndpoint> endpoints;
    +	private CrailStatistics statistics;
    +	private CrailBufferCache bufferCache;
    +
    +	public NvmfStorageClient() {
    +		this.initialized = false;
    +		this.endpoints = new CopyOnWriteArrayList<>();
    +		this.closing = false;
    +		this.keepAliveThread = new Thread(() -> {
    +			while (!closing) {
    +				for (NvmfStorageEndpoint endpoint : endpoints) {
    +					try {
    +						endpoint.keepAlive();
    +					} catch (IOException e) {
    +						e.printStackTrace();
    +						return;
    +					}
    +				}
    +				/* We use the default keep alive timer of 120s in jNVMf */
    +				try {
    +					Thread.sleep(TimeUnit.MILLISECONDS.convert(110, TimeUnit.SECONDS));
    +				} catch (InterruptedException e) {
    +					return;
    +				}
    +			}
    +		});
    +	}
    +
    +	boolean isValid() {
    --- End diff --
    
    [weak suggestion] Can we keep this function as "isAlive"? "isValid" says something is wrong with the configuration. 


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180332038
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Copyright (C) 2018, IBM Corporation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.crail.storage.nvmf.client;
    +
    +import org.apache.crail.CrailBuffer;
    +import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.storage.StorageFuture;
    +import org.apache.crail.storage.StorageResult;
    +
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +
    +public class NvmfUnalignedWriteFuture implements StorageFuture {
    +	private final NvmfStorageEndpoint endpoint;
    +	private StorageFuture beginFuture;
    +	private StorageFuture middleFuture;
    +	private StorageFuture endFuture;
    +	private final int written;
    +	private NvmfStagingBufferCache.BufferCacheEntry beginBuffer;
    +	private NvmfStagingBufferCache.BufferCacheEntry endBuffer;
    +
    +	private final boolean isSectorAligned(long address) {
    +		return address % endpoint.getLBADataSize() == 0;
    +	}
    +
    +	private final long floorToSectorSize(long address) {
    +		return address - (address % endpoint.getLBADataSize());
    --- End diff --
    
    Will do.


---

[GitHub] incubator-crail issue #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on the issue:

    https://github.com/apache/incubator-crail/pull/16
  
    Fixed all the changes requested by Animesh.


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180319636
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Copyright (C) 2018, IBM Corporation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.crail.storage.nvmf.client;
    +
    +import org.apache.crail.CrailBuffer;
    +import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.storage.StorageFuture;
    +import org.apache.crail.storage.StorageResult;
    +
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +
    +public class NvmfUnalignedWriteFuture implements StorageFuture {
    +	private final NvmfStorageEndpoint endpoint;
    +	private StorageFuture beginFuture;
    +	private StorageFuture middleFuture;
    +	private StorageFuture endFuture;
    +	private final int written;
    +	private NvmfStagingBufferCache.BufferCacheEntry beginBuffer;
    +	private NvmfStagingBufferCache.BufferCacheEntry endBuffer;
    +
    +	private final boolean isSectorAligned(long address) {
    +		return address % endpoint.getLBADataSize() == 0;
    +	}
    +
    +	private final long floorToSectorSize(long address) {
    +		return address - (address % endpoint.getLBADataSize());
    +	}
    +
    +	private final int leftInSector(long address) {
    +		return endpoint.getLBADataSize() - offsetInSector(address);
    +	}
    +
    +	private final int offsetInSector(long address) {
    +		return (int)(address % endpoint.getLBADataSize());
    +	}
    +
    +	NvmfUnalignedWriteFuture(NvmfStorageEndpoint endpoint, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws Exception {
    +		this.endpoint = endpoint;
    +		this.written = buffer.remaining();
    +		/* assume blockInfo.getAddr() is sector aligned */
    +		assert isSectorAligned(blockInfo.getAddr());
    +
    +		long nextRemoteOffset = remoteOffset;
    +		/* beginning */
    +		if (!isSectorAligned(remoteOffset)) {
    +			int copySize = Math.min(leftInSector(remoteOffset), buffer.remaining());
    +			nextRemoteOffset = remoteOffset + copySize;
    +			int oldLimit = buffer.limit();
    +			buffer.limit(buffer.position() + copySize);
    +			long alignedRemoteOffset = floorToSectorSize(remoteOffset);
    +			long alignedRemoteAddress = blockInfo.getAddr() + alignedRemoteOffset;
    +			beginBuffer = endpoint.getStagingBufferCache().getExisting(alignedRemoteAddress);
    +			if (beginBuffer == null) {
    +				/* we had to delete the old buffer because we ran out of space. This should happen rarely. */
    +				beginBuffer = endpoint.getStagingBufferCache().get(alignedRemoteAddress);
    +				endpoint.read(beginBuffer.getBuffer(), blockInfo, alignedRemoteOffset).get();
    +			} else {
    +				/* Wait for previous end operation to finish */
    +				beginBuffer.getFuture().get();
    +			}
    +			CrailBuffer stagingBuffer = beginBuffer.getBuffer();
    +			stagingBuffer.position(offsetInSector(remoteOffset));
    +			stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
    +			buffer.limit(oldLimit);
    +			stagingBuffer.position(0);
    +			beginFuture = endpoint.write(stagingBuffer, blockInfo, alignedRemoteOffset);
    +			beginBuffer.setFuture(beginFuture);
    +			stagingBuffer.position(offsetInSector(remoteOffset));
    +		}
    +
    +		/* middle */
    +		if (isSectorAligned(nextRemoteOffset) && buffer.remaining() >= endpoint.getLBADataSize()) {
    +			int oldLimit = buffer.limit();
    +			buffer.limit(buffer.position() + (int)floorToSectorSize(buffer.remaining()));
    --- End diff --
    
    this is just abusing the function ;)


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180313817
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java ---
    @@ -31,51 +28,68 @@
     
     import java.io.IOException;
     import java.net.InetSocketAddress;
    -import java.net.URI;
    +import java.util.List;
     
     public class NvmfStorageServer implements StorageServer {
     	private static final Logger LOG = CrailUtils.getLogger();
     
     	private boolean isAlive;
     	private long alignedSize;
    -	private long offset;
    +	private long address;
     	private boolean initialized = false;
    -	private NvmeEndpoint endpoint;
    +	private Controller controller;
     
     	public NvmfStorageServer() {}
    -	
    +
     	public void init(CrailConfiguration crailConfiguration, String[] args) throws Exception {
     		if (initialized) {
     			throw new IOException("NvmfStorageTier already initialized");
     		}
     		initialized = true;
     		NvmfStorageConstants.parseCmdLine(crailConfiguration, args);
     
    -		NvmeEndpointGroup group = new NvmeEndpointGroup(new NvmeTransportType[]{NvmeTransportType.RDMA}, NvmfStorageConstants.SERVER_MEMPOOL);
    -		endpoint = group.createEndpoint();
    -
    -		URI uri = new URI("nvmef://" + NvmfStorageConstants.IP_ADDR.getHostAddress() + ":" + NvmfStorageConstants.PORT +
    -					"/0/" + NvmfStorageConstants.NAMESPACE + "?subsystem=" + NvmfStorageConstants.NQN);
    -		endpoint.connect(uri);
    -
    -		long namespaceSize = endpoint.getNamespaceSize();
    +		Nvme nvme = new Nvme();
    +		NvmfTransportId transportId = new NvmfTransportId(
    +				new InetSocketAddress(NvmfStorageConstants.IP_ADDR, NvmfStorageConstants.PORT),
    +				NvmfStorageConstants.NQN);
    +		controller = nvme.connect(transportId);
    +		controller.getControllerConfiguration().setEnable(true);
    +		controller.syncConfiguration();
    +		controller.waitUntilReady();
    +
    +		List<Namespace> namespaces = controller.getActiveNamespaces();
    +		Namespace namespace = null;
    +		for (Namespace n : namespaces) {
    +			if (n.getIdentifier().equals(NvmfStorageConstants.NAMESPACE)) {
    +				namespace = n;
    +				break;
    +			}
    +		}
    +		if (namespace == null) {
    +			throw new IllegalArgumentException("No namespace with id " + NvmfStorageConstants.NAMESPACE +
    +					" at controller " + transportId.toString());
    +		}
    +		IdentifyNamespaceData namespaceData = namespace.getIdentifyNamespaceData();
    +		LbaFormat lbaFormat = namespaceData.getFormattedLbaSize();
    +		int dataSize = lbaFormat.getLbaDataSize().toInt();
    +		long namespaceSize = dataSize * namespaceData.getNamespaceCapacity();
     		alignedSize = namespaceSize - (namespaceSize % NvmfStorageConstants.ALLOCATION_SIZE);
    -		offset = 0;
    +		address = 0;
     
     		isAlive = true;
    -	}	
    +	}
     
     	@Override
     	public void printConf(Logger log) {
    -		NvmfStorageConstants.printConf(log);		
    +		NvmfStorageConstants.printConf(log);
     	}
     
     	public void run() {
     		LOG.info("NnvmfStorageServer started with NVMf target " + getAddress());
     		while (isAlive) {
     			try {
     				Thread.sleep(1000 /* ms */);
    --- End diff --
    
    Magic 1000 milliseconds :)


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180337495
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java ---
    @@ -19,208 +19,229 @@
     
     package org.apache.crail.storage.nvmf.client;
     
    -import com.ibm.disni.nvmef.NvmeCommand;
    -import com.ibm.disni.nvmef.NvmeEndpoint;
    -import com.ibm.disni.nvmef.NvmeEndpointGroup;
    -import com.ibm.disni.nvmef.spdk.IOCompletion;
    -
    +import com.ibm.jnvmf.*;
     import org.apache.crail.CrailBuffer;
    +import org.apache.crail.CrailBufferCache;
    +import org.apache.crail.CrailStatistics;
     import org.apache.crail.conf.CrailConstants;
    -import org.apache.crail.memory.BufferCache;
     import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.metadata.DataNodeInfo;
     import org.apache.crail.storage.StorageEndpoint;
     import org.apache.crail.storage.StorageFuture;
    -import org.apache.crail.storage.nvmf.NvmfBufferCache;
     import org.apache.crail.storage.nvmf.NvmfStorageConstants;
     import org.apache.crail.utils.CrailUtils;
     import org.slf4j.Logger;
     
     import java.io.IOException;
    +import java.net.InetAddress;
     import java.net.InetSocketAddress;
    -import java.net.URI;
    -import java.net.URISyntaxException;
    -import java.util.concurrent.*;
    +import java.util.List;
    +import java.util.Queue;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.TimeoutException;
    +import java.util.concurrent.atomic.AtomicInteger;
     
     public class NvmfStorageEndpoint implements StorageEndpoint {
     	private static final Logger LOG = CrailUtils.getLogger();
     
    -	private final InetSocketAddress inetSocketAddress;
    -	private final NvmeEndpoint endpoint;
    -	private final int sectorSize;
    -	private final BufferCache cache;
    -	private final BlockingQueue<NvmeCommand> freeCommands;
    -	private final NvmeCommand[] commands;
    -	private final NvmfStorageFuture[] futures;
    -	private final ThreadLocal<long[]> completed;
    -	private final int ioQeueueSize;
    -
    -	public NvmfStorageEndpoint(NvmeEndpointGroup group, InetSocketAddress inetSocketAddress) throws IOException {
    -		this.inetSocketAddress = inetSocketAddress;
    -		endpoint = group.createEndpoint();
    +	private final Controller controller;
    +	private final IoQueuePair queuePair;
    +	private final int lbaDataSize;
    +	private final long namespaceCapacity;
    +	private final NvmfRegisteredBufferCache registeredBufferCache;
    +	private final NvmfStagingBufferCache stagingBufferCache;
    +	private final CrailStatistics statistics;
    +
    +	private final Queue<NvmWriteCommand> writeCommands;
    +	private final Queue<NvmReadCommand> readCommands;
    +
    +	private final AtomicInteger outstandingOperations;
    +
    +	public NvmfStorageEndpoint(Nvme nvme, DataNodeInfo info, CrailStatistics statistics,
    +							   CrailBufferCache bufferCache) throws IOException {
    +		InetSocketAddress inetSocketAddress = new InetSocketAddress(
    +				InetAddress.getByAddress(info.getIpAddress()), info.getPort());
    +		// XXX FIXME: nsid from datanodeinfo
    +		NvmfTransportId transportId = new NvmfTransportId(inetSocketAddress,
    +				new NvmeQualifiedName(NvmfStorageConstants.NQN.toString() + info.getPort()));
    +		LOG.info("Connecting to NVMf target at " + transportId.toString());
    +		controller = nvme.connect(transportId);
    +		controller.getControllerConfiguration().setEnable(true);
    +		controller.syncConfiguration();
     		try {
    -			URI url = new URI("nvmef://" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() +
    -					"/0/" + NvmfStorageConstants.NAMESPACE + "?subsystem=nqn.2016-06.io.spdk:cnode1");
    -			LOG.info("Connecting to " + url.toString());
    -			endpoint.connect(url);
    -		} catch (URISyntaxException e) {
    -			//FIXME
    -			e.printStackTrace();
    +			controller.waitUntilReady();
    +		} catch (TimeoutException e) {
    +			throw new IOException(e);
     		}
    -		sectorSize = endpoint.getSectorSize();
    -		cache = new NvmfBufferCache();
    -		ioQeueueSize = endpoint.getIOQueueSize();
    -		freeCommands = new ArrayBlockingQueue<NvmeCommand>(ioQeueueSize);
    -		commands = new NvmeCommand[ioQeueueSize];
    -		for (int i = 0; i < ioQeueueSize; i++) {
    -			NvmeCommand command = endpoint.newCommand();
    -			command.setId(i);
    -			commands[i] = command;
    -			freeCommands.add(command);
    +		IdentifyControllerData identifyControllerData = controller.getIdentifyControllerData();
    +		if (CrailConstants.SLICE_SIZE > identifyControllerData.getMaximumDataTransferSize().toInt()) {
    +			throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + " > max transfer size (" +
    +					identifyControllerData.getMaximumDataTransferSize() + ")");
     		}
    -		futures = new NvmfStorageFuture[ioQeueueSize];
    -		completed = new ThreadLocal<long[]>() {
    -			public long[] initialValue() {
    -				return new long[ioQeueueSize];
    +		List<Namespace> namespaces = controller.getActiveNamespaces();
    +		//TODO: poll nsid in datanodeinfo
    +		NamespaceIdentifier namespaceIdentifier = new NamespaceIdentifier(1);
    +		Namespace namespace = null;
    +		for (Namespace n : namespaces) {
    +			if (n.getIdentifier().equals(namespaceIdentifier)) {
    +				namespace = n;
    +				break;
     			}
    -		};
    +		}
    +		if (namespace == null) {
    +			throw new IllegalArgumentException("No namespace with id " + namespaceIdentifier +
    +					" at controller " + transportId.toString());
    +		}
    +		IdentifyNamespaceData identifyNamespaceData = namespace.getIdentifyNamespaceData();
    +		lbaDataSize = identifyNamespaceData.getFormattedLbaSize().getLbaDataSize().toInt();
    +		if (CrailConstants.SLICE_SIZE % lbaDataSize != 0) {
    +			throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY +
    +					" is not a multiple of LBA data size (" + lbaDataSize + ")");
    +		}
    +		namespaceCapacity = identifyNamespaceData.getNamespaceCapacity() * lbaDataSize;
    +		this.queuePair = controller.createIoQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0,
    +				SubmissionQueueEntry.SIZE);
    +
    +		this.writeCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
    +		this.readCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
    +		for(int i = 0; i < NvmfStorageConstants.QUEUE_SIZE; i++) {
    +			NvmWriteCommand writeCommand = new NvmWriteCommand(queuePair);
    +			writeCommand.setSendInline(true);
    +			writeCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
    +			writeCommands.add(writeCommand);
    +			NvmReadCommand readCommand = new NvmReadCommand(queuePair);
    +			readCommand.setSendInline(true);
    +			readCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
    +			readCommands.add(readCommand);
    +		}
    +		this.registeredBufferCache = new NvmfRegisteredBufferCache(queuePair);
    +		this.outstandingOperations = new AtomicInteger(0);
    +		this.stagingBufferCache = new NvmfStagingBufferCache(bufferCache,
    +				NvmfStorageConstants.STAGING_CACHE_SIZE, getLBADataSize());
    +		this.statistics = statistics;
    +	}
    +
    +	public void keepAlive() throws IOException {
    +		controller.keepAlive();
    +	}
    +
    +	public int getLBADataSize() {
    +		return lbaDataSize;
     	}
     
    -	public int getSectorSize() {
    -		return sectorSize;
    +	public long getNamespaceCapacity() {
    +		return namespaceCapacity;
     	}
     
     	enum Operation {
     		WRITE,
    -		READ;
    +		READ
     	}
     
    -	public StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset)
    -			throws IOException, InterruptedException {
    -		int length = buffer.remaining();
    -		if (length > CrailConstants.BLOCK_SIZE){
    -			throw new IOException("write size too large " + length);
    -		}
    -		if (length <= 0){
    -			throw new IOException("write size too small, len " + length);
    -		}
    -		if (buffer.position() < 0){
    -			throw new IOException("local offset too small " + buffer.position());
    -		}
    -		if (remoteOffset < 0){
    -			throw new IOException("remote offset too small " + remoteOffset);
    -		}
    +	void putOperation() {
    +		outstandingOperations.decrementAndGet();
    +	}
     
    -		if (remoteMr.getAddr() + remoteOffset + length > endpoint.getNamespaceSize()){
    -			long tmpAddr = remoteMr.getAddr() + remoteOffset + length;
    -			throw new IOException("remote fileOffset + remoteOffset + len = " + tmpAddr + " - size = " +
    -					endpoint.getNamespaceSize());
    +	private boolean tryGetOperation() {
    +		int outstandingOperationsOld = outstandingOperations.get();
    +		if (outstandingOperationsOld < NvmfStorageConstants.QUEUE_SIZE) {
    +			return outstandingOperations.compareAndSet(outstandingOperationsOld, outstandingOperationsOld + 1);
     		}
    +		return false;
    +	}
     
    -//		LOG.info("op = " + op.name() +
    -//				", position = " + buffer.position() +
    -//				", localOffset = " + buffer.position() +
    -//				", remoteOffset = " + remoteOffset +
    -//				", remoteAddr = " + remoteMr.getAddr() +
    -//				", length = " + length);
    -
    -		NvmeCommand command = freeCommands.poll();
    -		while (command == null) {
    -			poll();
    -			command = freeCommands.poll();
    -		}
    +	private static int divCeil(int a, int b) {
    +		return (a + b - 1) / b;
    +	}
     
    -		boolean aligned = NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0
    -				&& NvmfStorageUtils.namespaceSectorOffset(sectorSize, length) == 0;
    -		long lba = NvmfStorageUtils.linearBlockAddress(remoteMr, remoteOffset, sectorSize);
    -		StorageFuture future = null;
    -		if (aligned) {
    -//			LOG.info("aligned");
    -			command.setBuffer(buffer.getByteBuffer()).setLinearBlockAddress(lba);
    -			switch(op) {
    -				case READ:
    -					command.read();
    -					break;
    -				case WRITE:
    -					command.write();
    -					break;
    -			}
    -			future = futures[(int)command.getId()] = new NvmfStorageFuture(this, length);
    -			command.execute();
    -		} else {
    -//			LOG.info("unaligned");
    -			long alignedLength = NvmfStorageUtils.alignLength(sectorSize, remoteOffset, length);
    +	private int getNumLogicalBlocks(CrailBuffer buffer) {
    +		return divCeil(buffer.remaining(), getLBADataSize());
    +	}
     
    -			CrailBuffer stagingBuffer = cache.allocateBuffer();
    -			stagingBuffer.limit((int)alignedLength);
    +	StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws InterruptedException, IOException {
    +		assert blockInfo.getAddr() + remoteOffset + buffer.remaining() <= getNamespaceCapacity();
    --- End diff --
    
    Yes, this is meant for debugging purposes only.


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180312351
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java ---
    @@ -31,41 +30,92 @@
     import org.apache.crail.utils.CrailUtils;
     import org.slf4j.Logger;
     
    -import com.ibm.disni.nvmef.NvmeEndpointGroup;
    -import com.ibm.disni.nvmef.spdk.NvmeTransportType;
    +import java.io.IOException;
    +import java.net.UnknownHostException;
    +import java.util.List;
    +import java.util.concurrent.CopyOnWriteArrayList;
    +import java.util.concurrent.TimeUnit;
     
     public class NvmfStorageClient implements StorageClient {
     	private static final Logger LOG = CrailUtils.getLogger();
    -	private static NvmeEndpointGroup clientGroup;
    -	private boolean initialized = false;
    +	private static Nvme nvme;
    +	private boolean initialized;
    +	private volatile boolean closing;
    +	private final Thread keepAliveThread;
    +	private List<NvmfStorageEndpoint> endpoints;
    +	private CrailStatistics statistics;
    +	private CrailBufferCache bufferCache;
    +
    +	public NvmfStorageClient() {
    +		this.initialized = false;
    +		this.endpoints = new CopyOnWriteArrayList<>();
    +		this.closing = false;
    +		this.keepAliveThread = new Thread(() -> {
    +			while (!closing) {
    +				for (NvmfStorageEndpoint endpoint : endpoints) {
    +					try {
    +						endpoint.keepAlive();
    +					} catch (IOException e) {
    +						e.printStackTrace();
    +						return;
    +					}
    +				}
    +				/* We use the default keep alive timer of 120s in jNVMf */
    +				try {
    +					Thread.sleep(TimeUnit.MILLISECONDS.convert(110, TimeUnit.SECONDS));
    --- End diff --
    
    110 or 120 seconds (as the comment says). Is this configurable? 


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180312653
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java ---
    @@ -31,41 +30,92 @@
     import org.apache.crail.utils.CrailUtils;
     import org.slf4j.Logger;
     
    -import com.ibm.disni.nvmef.NvmeEndpointGroup;
    -import com.ibm.disni.nvmef.spdk.NvmeTransportType;
    +import java.io.IOException;
    +import java.net.UnknownHostException;
    +import java.util.List;
    +import java.util.concurrent.CopyOnWriteArrayList;
    +import java.util.concurrent.TimeUnit;
     
     public class NvmfStorageClient implements StorageClient {
     	private static final Logger LOG = CrailUtils.getLogger();
    -	private static NvmeEndpointGroup clientGroup;
    -	private boolean initialized = false;
    +	private static Nvme nvme;
    +	private boolean initialized;
    +	private volatile boolean closing;
    +	private final Thread keepAliveThread;
    +	private List<NvmfStorageEndpoint> endpoints;
    +	private CrailStatistics statistics;
    +	private CrailBufferCache bufferCache;
    +
    +	public NvmfStorageClient() {
    +		this.initialized = false;
    +		this.endpoints = new CopyOnWriteArrayList<>();
    +		this.closing = false;
    +		this.keepAliveThread = new Thread(() -> {
    +			while (!closing) {
    +				for (NvmfStorageEndpoint endpoint : endpoints) {
    +					try {
    +						endpoint.keepAlive();
    +					} catch (IOException e) {
    +						e.printStackTrace();
    +						return;
    +					}
    +				}
    +				/* We use the default keep alive timer of 120s in jNVMf */
    +				try {
    +					Thread.sleep(TimeUnit.MILLISECONDS.convert(110, TimeUnit.SECONDS));
    +				} catch (InterruptedException e) {
    +					return;
    +				}
    +			}
    +		});
    +	}
    +
    +	boolean isValid() {
    +		return keepAliveThread.isAlive();
    +	}
     
     	public void init(CrailStatistics statistics, CrailBufferCache bufferCache, CrailConfiguration crailConfiguration,
     					 String[] args) throws IOException {
     		if (initialized) {
     			throw new IOException("NvmfStorageTier already initialized");
     		}
     		initialized = true;
    -
    +		this.statistics = statistics;
    +		this.bufferCache = bufferCache;
    +		LOG.info("Initialize Nvmf storage client");
     		NvmfStorageConstants.parseCmdLine(crailConfiguration, args);
    +		keepAliveThread.start();
     	}
     
     	public void printConf(Logger logger) {
     		NvmfStorageConstants.printConf(logger);
     	}
     
    -	public static NvmeEndpointGroup getEndpointGroup() {
    -		if (clientGroup == null) {
    -			clientGroup = new NvmeEndpointGroup(new NvmeTransportType[]{NvmeTransportType.RDMA},
    -					NvmfStorageConstants.CLIENT_MEMPOOL);
    +	public static Nvme getEndpointGroup() throws UnknownHostException {
    --- End diff --
    
    Does this public and static function need protection against the race initialization condition? 


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180338259
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Copyright (C) 2018, IBM Corporation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.crail.storage.nvmf.client;
    +
    +import org.apache.crail.CrailBuffer;
    +import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.storage.StorageFuture;
    +import org.apache.crail.storage.StorageResult;
    +
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +
    +public class NvmfUnalignedWriteFuture implements StorageFuture {
    +	private final NvmfStorageEndpoint endpoint;
    +	private StorageFuture beginFuture;
    +	private StorageFuture middleFuture;
    +	private StorageFuture endFuture;
    +	private final int written;
    +	private NvmfStagingBufferCache.BufferCacheEntry beginBuffer;
    +	private NvmfStagingBufferCache.BufferCacheEntry endBuffer;
    +
    +	private final boolean isSectorAligned(long address) {
    +		return address % endpoint.getLBADataSize() == 0;
    +	}
    +
    +	private final long floorToSectorSize(long address) {
    +		return address - (address % endpoint.getLBADataSize());
    +	}
    +
    +	private final int leftInSector(long address) {
    +		return endpoint.getLBADataSize() - offsetInSector(address);
    +	}
    +
    +	private final int offsetInSector(long address) {
    +		return (int)(address % endpoint.getLBADataSize());
    +	}
    +
    +	NvmfUnalignedWriteFuture(NvmfStorageEndpoint endpoint, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws Exception {
    +		this.endpoint = endpoint;
    +		this.written = buffer.remaining();
    +		/* assume blockInfo.getAddr() is sector aligned */
    +		assert isSectorAligned(blockInfo.getAddr());
    +
    +		long nextRemoteOffset = remoteOffset;
    +		/* beginning */
    +		if (!isSectorAligned(remoteOffset)) {
    +			int copySize = Math.min(leftInSector(remoteOffset), buffer.remaining());
    +			nextRemoteOffset = remoteOffset + copySize;
    +			int oldLimit = buffer.limit();
    +			buffer.limit(buffer.position() + copySize);
    +			long alignedRemoteOffset = floorToSectorSize(remoteOffset);
    +			long alignedRemoteAddress = blockInfo.getAddr() + alignedRemoteOffset;
    +			beginBuffer = endpoint.getStagingBufferCache().getExisting(alignedRemoteAddress);
    +			if (beginBuffer == null) {
    +				/* we had to delete the old buffer because we ran out of space. This should happen rarely. */
    +				beginBuffer = endpoint.getStagingBufferCache().get(alignedRemoteAddress);
    +				endpoint.read(beginBuffer.getBuffer(), blockInfo, alignedRemoteOffset).get();
    +			} else {
    +				/* Wait for previous end operation to finish */
    +				beginBuffer.getFuture().get();
    +			}
    +			CrailBuffer stagingBuffer = beginBuffer.getBuffer();
    +			stagingBuffer.position(offsetInSector(remoteOffset));
    +			stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
    +			buffer.limit(oldLimit);
    +			stagingBuffer.position(0);
    +			beginFuture = endpoint.write(stagingBuffer, blockInfo, alignedRemoteOffset);
    +			beginBuffer.setFuture(beginFuture);
    +			stagingBuffer.position(offsetInSector(remoteOffset));
    +		}
    +
    +		/* middle */
    +		if (isSectorAligned(nextRemoteOffset) && buffer.remaining() >= endpoint.getLBADataSize()) {
    +			int oldLimit = buffer.limit();
    +			buffer.limit(buffer.position() + (int)floorToSectorSize(buffer.remaining()));
    +			int toWrite = buffer.remaining();
    +			middleFuture = endpoint.write(buffer, blockInfo, nextRemoteOffset);
    +			nextRemoteOffset += toWrite;
    +			buffer.position(buffer.limit());
    +			buffer.limit(oldLimit);
    +		}
    +
    +		/* end */
    +		if (buffer.remaining() > 0) {
    +			endBuffer = endpoint.getStagingBufferCache().get(blockInfo.getAddr() + nextRemoteOffset);
    +			CrailBuffer stagingBuffer = endBuffer.getBuffer();
    +			stagingBuffer.position(0);
    +			stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
    +			stagingBuffer.position(0);
    +			endFuture = endpoint.write(stagingBuffer, blockInfo, nextRemoteOffset);
    +			endBuffer.setFuture(endFuture);
    +		}
    +	}
    +
    +	@Override
    +	public boolean isSynchronous() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean cancel(boolean b) {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isCancelled() {
    +		return false;
    +	}
    +
    +	private static boolean checkIfFutureIsDone(StorageFuture future) {
    +		return (future != null && future.isDone()) || future == null;
    +	}
    +
    +	@Override
    +	public boolean isDone() {
    +		if (beginFuture != null && beginFuture.isDone()) {
    +			if (beginBuffer != null) {
    +				beginBuffer.put();
    +				beginBuffer = null;
    +			}
    +		}
    +		if (endFuture != null && endFuture.isDone()) {
    +			if (endBuffer != null) {
    +				endBuffer.put();
    +				endBuffer = null;
    +			}
    +		}
    +		return beginBuffer == null && checkIfFutureIsDone(middleFuture) && endBuffer == null;
    +	}
    +
    +	@Override
    +	public StorageResult get() throws InterruptedException, ExecutionException {
    +		try {
    +			return get(2, TimeUnit.MINUTES);
    +		} catch (TimeoutException e) {
    +			throw new ExecutionException(e);
    +		}
    +	}
    +
    +	@Override
    +	public StorageResult get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
    +		if (!isDone()) {
    +			long start = System.nanoTime();
    +			long end = start + TimeUnit.NANOSECONDS.convert(timeout, timeUnit);
    +			boolean waitTimeOut;
    +			do {
    +				waitTimeOut = System.nanoTime() > end;
    +			} while (!isDone() && !waitTimeOut);
    +			if (!isDone() && waitTimeOut) {
    +				throw new TimeoutException("poll wait time out!");
    +			}
    +		}
    +		if (beginFuture != null) {
    --- End diff --
    
    Any one of the futures can be null. There can be a begin only, end only, begin + middle, begin + middle + end and middle + end operation depending on whether begin or the end sector is unaligned or both. See constructor for the logic.


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180320719
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Copyright (C) 2018, IBM Corporation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.crail.storage.nvmf.client;
    +
    +import org.apache.crail.CrailBuffer;
    +import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.storage.StorageFuture;
    +import org.apache.crail.storage.StorageResult;
    +
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +
    +public class NvmfUnalignedWriteFuture implements StorageFuture {
    +	private final NvmfStorageEndpoint endpoint;
    +	private StorageFuture beginFuture;
    +	private StorageFuture middleFuture;
    +	private StorageFuture endFuture;
    +	private final int written;
    +	private NvmfStagingBufferCache.BufferCacheEntry beginBuffer;
    +	private NvmfStagingBufferCache.BufferCacheEntry endBuffer;
    +
    +	private final boolean isSectorAligned(long address) {
    +		return address % endpoint.getLBADataSize() == 0;
    +	}
    +
    +	private final long floorToSectorSize(long address) {
    +		return address - (address % endpoint.getLBADataSize());
    +	}
    +
    +	private final int leftInSector(long address) {
    +		return endpoint.getLBADataSize() - offsetInSector(address);
    +	}
    +
    +	private final int offsetInSector(long address) {
    +		return (int)(address % endpoint.getLBADataSize());
    +	}
    +
    +	NvmfUnalignedWriteFuture(NvmfStorageEndpoint endpoint, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws Exception {
    +		this.endpoint = endpoint;
    +		this.written = buffer.remaining();
    +		/* assume blockInfo.getAddr() is sector aligned */
    +		assert isSectorAligned(blockInfo.getAddr());
    +
    +		long nextRemoteOffset = remoteOffset;
    +		/* beginning */
    +		if (!isSectorAligned(remoteOffset)) {
    +			int copySize = Math.min(leftInSector(remoteOffset), buffer.remaining());
    +			nextRemoteOffset = remoteOffset + copySize;
    +			int oldLimit = buffer.limit();
    +			buffer.limit(buffer.position() + copySize);
    +			long alignedRemoteOffset = floorToSectorSize(remoteOffset);
    +			long alignedRemoteAddress = blockInfo.getAddr() + alignedRemoteOffset;
    +			beginBuffer = endpoint.getStagingBufferCache().getExisting(alignedRemoteAddress);
    +			if (beginBuffer == null) {
    +				/* we had to delete the old buffer because we ran out of space. This should happen rarely. */
    +				beginBuffer = endpoint.getStagingBufferCache().get(alignedRemoteAddress);
    +				endpoint.read(beginBuffer.getBuffer(), blockInfo, alignedRemoteOffset).get();
    +			} else {
    +				/* Wait for previous end operation to finish */
    +				beginBuffer.getFuture().get();
    +			}
    +			CrailBuffer stagingBuffer = beginBuffer.getBuffer();
    +			stagingBuffer.position(offsetInSector(remoteOffset));
    +			stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
    +			buffer.limit(oldLimit);
    +			stagingBuffer.position(0);
    +			beginFuture = endpoint.write(stagingBuffer, blockInfo, alignedRemoteOffset);
    +			beginBuffer.setFuture(beginFuture);
    +			stagingBuffer.position(offsetInSector(remoteOffset));
    +		}
    +
    +		/* middle */
    +		if (isSectorAligned(nextRemoteOffset) && buffer.remaining() >= endpoint.getLBADataSize()) {
    +			int oldLimit = buffer.limit();
    +			buffer.limit(buffer.position() + (int)floorToSectorSize(buffer.remaining()));
    +			int toWrite = buffer.remaining();
    +			middleFuture = endpoint.write(buffer, blockInfo, nextRemoteOffset);
    +			nextRemoteOffset += toWrite;
    +			buffer.position(buffer.limit());
    +			buffer.limit(oldLimit);
    +		}
    +
    +		/* end */
    +		if (buffer.remaining() > 0) {
    +			endBuffer = endpoint.getStagingBufferCache().get(blockInfo.getAddr() + nextRemoteOffset);
    +			CrailBuffer stagingBuffer = endBuffer.getBuffer();
    +			stagingBuffer.position(0);
    +			stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
    +			stagingBuffer.position(0);
    +			endFuture = endpoint.write(stagingBuffer, blockInfo, nextRemoteOffset);
    +			endBuffer.setFuture(endFuture);
    +		}
    +	}
    +
    +	@Override
    +	public boolean isSynchronous() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean cancel(boolean b) {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isCancelled() {
    +		return false;
    +	}
    +
    +	private static boolean checkIfFutureIsDone(StorageFuture future) {
    +		return (future != null && future.isDone()) || future == null;
    +	}
    +
    +	@Override
    +	public boolean isDone() {
    +		if (beginFuture != null && beginFuture.isDone()) {
    +			if (beginBuffer != null) {
    +				beginBuffer.put();
    +				beginBuffer = null;
    +			}
    +		}
    +		if (endFuture != null && endFuture.isDone()) {
    +			if (endBuffer != null) {
    +				endBuffer.put();
    +				endBuffer = null;
    +			}
    +		}
    +		return beginBuffer == null && checkIfFutureIsDone(middleFuture) && endBuffer == null;
    +	}
    +
    +	@Override
    +	public StorageResult get() throws InterruptedException, ExecutionException {
    +		try {
    +			return get(2, TimeUnit.MINUTES);
    +		} catch (TimeoutException e) {
    +			throw new ExecutionException(e);
    +		}
    +	}
    +
    +	@Override
    +	public StorageResult get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
    +		if (!isDone()) {
    +			long start = System.nanoTime();
    +			long end = start + TimeUnit.NANOSECONDS.convert(timeout, timeUnit);
    +			boolean waitTimeOut;
    +			do {
    +				waitTimeOut = System.nanoTime() > end;
    +			} while (!isDone() && !waitTimeOut);
    +			if (!isDone() && waitTimeOut) {
    +				throw new TimeoutException("poll wait time out!");
    +			}
    +		}
    +		if (beginFuture != null) {
    --- End diff --
    
    I don't get this logic here? When would beginFuture (or others) will be null? 


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180337795
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java ---
    @@ -19,208 +19,229 @@
     
     package org.apache.crail.storage.nvmf.client;
     
    -import com.ibm.disni.nvmef.NvmeCommand;
    -import com.ibm.disni.nvmef.NvmeEndpoint;
    -import com.ibm.disni.nvmef.NvmeEndpointGroup;
    -import com.ibm.disni.nvmef.spdk.IOCompletion;
    -
    +import com.ibm.jnvmf.*;
     import org.apache.crail.CrailBuffer;
    +import org.apache.crail.CrailBufferCache;
    +import org.apache.crail.CrailStatistics;
     import org.apache.crail.conf.CrailConstants;
    -import org.apache.crail.memory.BufferCache;
     import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.metadata.DataNodeInfo;
     import org.apache.crail.storage.StorageEndpoint;
     import org.apache.crail.storage.StorageFuture;
    -import org.apache.crail.storage.nvmf.NvmfBufferCache;
     import org.apache.crail.storage.nvmf.NvmfStorageConstants;
     import org.apache.crail.utils.CrailUtils;
     import org.slf4j.Logger;
     
     import java.io.IOException;
    +import java.net.InetAddress;
     import java.net.InetSocketAddress;
    -import java.net.URI;
    -import java.net.URISyntaxException;
    -import java.util.concurrent.*;
    +import java.util.List;
    +import java.util.Queue;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.TimeoutException;
    +import java.util.concurrent.atomic.AtomicInteger;
     
     public class NvmfStorageEndpoint implements StorageEndpoint {
     	private static final Logger LOG = CrailUtils.getLogger();
     
    -	private final InetSocketAddress inetSocketAddress;
    -	private final NvmeEndpoint endpoint;
    -	private final int sectorSize;
    -	private final BufferCache cache;
    -	private final BlockingQueue<NvmeCommand> freeCommands;
    -	private final NvmeCommand[] commands;
    -	private final NvmfStorageFuture[] futures;
    -	private final ThreadLocal<long[]> completed;
    -	private final int ioQeueueSize;
    -
    -	public NvmfStorageEndpoint(NvmeEndpointGroup group, InetSocketAddress inetSocketAddress) throws IOException {
    -		this.inetSocketAddress = inetSocketAddress;
    -		endpoint = group.createEndpoint();
    +	private final Controller controller;
    +	private final IoQueuePair queuePair;
    +	private final int lbaDataSize;
    +	private final long namespaceCapacity;
    +	private final NvmfRegisteredBufferCache registeredBufferCache;
    +	private final NvmfStagingBufferCache stagingBufferCache;
    +	private final CrailStatistics statistics;
    +
    +	private final Queue<NvmWriteCommand> writeCommands;
    +	private final Queue<NvmReadCommand> readCommands;
    +
    +	private final AtomicInteger outstandingOperations;
    +
    +	public NvmfStorageEndpoint(Nvme nvme, DataNodeInfo info, CrailStatistics statistics,
    +							   CrailBufferCache bufferCache) throws IOException {
    +		InetSocketAddress inetSocketAddress = new InetSocketAddress(
    +				InetAddress.getByAddress(info.getIpAddress()), info.getPort());
    +		// XXX FIXME: nsid from datanodeinfo
    +		NvmfTransportId transportId = new NvmfTransportId(inetSocketAddress,
    +				new NvmeQualifiedName(NvmfStorageConstants.NQN.toString() + info.getPort()));
    +		LOG.info("Connecting to NVMf target at " + transportId.toString());
    +		controller = nvme.connect(transportId);
    +		controller.getControllerConfiguration().setEnable(true);
    +		controller.syncConfiguration();
     		try {
    -			URI url = new URI("nvmef://" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() +
    -					"/0/" + NvmfStorageConstants.NAMESPACE + "?subsystem=nqn.2016-06.io.spdk:cnode1");
    -			LOG.info("Connecting to " + url.toString());
    -			endpoint.connect(url);
    -		} catch (URISyntaxException e) {
    -			//FIXME
    -			e.printStackTrace();
    +			controller.waitUntilReady();
    +		} catch (TimeoutException e) {
    +			throw new IOException(e);
     		}
    -		sectorSize = endpoint.getSectorSize();
    -		cache = new NvmfBufferCache();
    -		ioQeueueSize = endpoint.getIOQueueSize();
    -		freeCommands = new ArrayBlockingQueue<NvmeCommand>(ioQeueueSize);
    -		commands = new NvmeCommand[ioQeueueSize];
    -		for (int i = 0; i < ioQeueueSize; i++) {
    -			NvmeCommand command = endpoint.newCommand();
    -			command.setId(i);
    -			commands[i] = command;
    -			freeCommands.add(command);
    +		IdentifyControllerData identifyControllerData = controller.getIdentifyControllerData();
    +		if (CrailConstants.SLICE_SIZE > identifyControllerData.getMaximumDataTransferSize().toInt()) {
    +			throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + " > max transfer size (" +
    +					identifyControllerData.getMaximumDataTransferSize() + ")");
     		}
    -		futures = new NvmfStorageFuture[ioQeueueSize];
    -		completed = new ThreadLocal<long[]>() {
    -			public long[] initialValue() {
    -				return new long[ioQeueueSize];
    +		List<Namespace> namespaces = controller.getActiveNamespaces();
    +		//TODO: poll nsid in datanodeinfo
    +		NamespaceIdentifier namespaceIdentifier = new NamespaceIdentifier(1);
    +		Namespace namespace = null;
    +		for (Namespace n : namespaces) {
    +			if (n.getIdentifier().equals(namespaceIdentifier)) {
    +				namespace = n;
    +				break;
     			}
    -		};
    +		}
    +		if (namespace == null) {
    +			throw new IllegalArgumentException("No namespace with id " + namespaceIdentifier +
    +					" at controller " + transportId.toString());
    +		}
    +		IdentifyNamespaceData identifyNamespaceData = namespace.getIdentifyNamespaceData();
    +		lbaDataSize = identifyNamespaceData.getFormattedLbaSize().getLbaDataSize().toInt();
    +		if (CrailConstants.SLICE_SIZE % lbaDataSize != 0) {
    +			throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY +
    +					" is not a multiple of LBA data size (" + lbaDataSize + ")");
    +		}
    +		namespaceCapacity = identifyNamespaceData.getNamespaceCapacity() * lbaDataSize;
    +		this.queuePair = controller.createIoQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0,
    +				SubmissionQueueEntry.SIZE);
    +
    +		this.writeCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
    +		this.readCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
    +		for(int i = 0; i < NvmfStorageConstants.QUEUE_SIZE; i++) {
    +			NvmWriteCommand writeCommand = new NvmWriteCommand(queuePair);
    +			writeCommand.setSendInline(true);
    +			writeCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
    +			writeCommands.add(writeCommand);
    +			NvmReadCommand readCommand = new NvmReadCommand(queuePair);
    +			readCommand.setSendInline(true);
    +			readCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
    +			readCommands.add(readCommand);
    +		}
    +		this.registeredBufferCache = new NvmfRegisteredBufferCache(queuePair);
    +		this.outstandingOperations = new AtomicInteger(0);
    +		this.stagingBufferCache = new NvmfStagingBufferCache(bufferCache,
    +				NvmfStorageConstants.STAGING_CACHE_SIZE, getLBADataSize());
    +		this.statistics = statistics;
    +	}
    +
    +	public void keepAlive() throws IOException {
    +		controller.keepAlive();
    +	}
    +
    +	public int getLBADataSize() {
    +		return lbaDataSize;
     	}
     
    -	public int getSectorSize() {
    -		return sectorSize;
    +	public long getNamespaceCapacity() {
    +		return namespaceCapacity;
     	}
     
     	enum Operation {
     		WRITE,
    -		READ;
    +		READ
     	}
     
    -	public StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset)
    -			throws IOException, InterruptedException {
    -		int length = buffer.remaining();
    -		if (length > CrailConstants.BLOCK_SIZE){
    -			throw new IOException("write size too large " + length);
    -		}
    -		if (length <= 0){
    -			throw new IOException("write size too small, len " + length);
    -		}
    -		if (buffer.position() < 0){
    -			throw new IOException("local offset too small " + buffer.position());
    -		}
    -		if (remoteOffset < 0){
    -			throw new IOException("remote offset too small " + remoteOffset);
    -		}
    +	void putOperation() {
    +		outstandingOperations.decrementAndGet();
    +	}
     
    -		if (remoteMr.getAddr() + remoteOffset + length > endpoint.getNamespaceSize()){
    -			long tmpAddr = remoteMr.getAddr() + remoteOffset + length;
    -			throw new IOException("remote fileOffset + remoteOffset + len = " + tmpAddr + " - size = " +
    -					endpoint.getNamespaceSize());
    +	private boolean tryGetOperation() {
    +		int outstandingOperationsOld = outstandingOperations.get();
    +		if (outstandingOperationsOld < NvmfStorageConstants.QUEUE_SIZE) {
    +			return outstandingOperations.compareAndSet(outstandingOperationsOld, outstandingOperationsOld + 1);
     		}
    +		return false;
    +	}
     
    -//		LOG.info("op = " + op.name() +
    -//				", position = " + buffer.position() +
    -//				", localOffset = " + buffer.position() +
    -//				", remoteOffset = " + remoteOffset +
    -//				", remoteAddr = " + remoteMr.getAddr() +
    -//				", length = " + length);
    -
    -		NvmeCommand command = freeCommands.poll();
    -		while (command == null) {
    -			poll();
    -			command = freeCommands.poll();
    -		}
    +	private static int divCeil(int a, int b) {
    +		return (a + b - 1) / b;
    +	}
     
    -		boolean aligned = NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0
    -				&& NvmfStorageUtils.namespaceSectorOffset(sectorSize, length) == 0;
    -		long lba = NvmfStorageUtils.linearBlockAddress(remoteMr, remoteOffset, sectorSize);
    -		StorageFuture future = null;
    -		if (aligned) {
    -//			LOG.info("aligned");
    -			command.setBuffer(buffer.getByteBuffer()).setLinearBlockAddress(lba);
    -			switch(op) {
    -				case READ:
    -					command.read();
    -					break;
    -				case WRITE:
    -					command.write();
    -					break;
    -			}
    -			future = futures[(int)command.getId()] = new NvmfStorageFuture(this, length);
    -			command.execute();
    -		} else {
    -//			LOG.info("unaligned");
    -			long alignedLength = NvmfStorageUtils.alignLength(sectorSize, remoteOffset, length);
    +	private int getNumLogicalBlocks(CrailBuffer buffer) {
    +		return divCeil(buffer.remaining(), getLBADataSize());
    +	}
     
    -			CrailBuffer stagingBuffer = cache.allocateBuffer();
    -			stagingBuffer.limit((int)alignedLength);
    +	StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws InterruptedException, IOException {
    +		assert blockInfo.getAddr() + remoteOffset + buffer.remaining() <= getNamespaceCapacity();
    +		assert remoteOffset >= 0;
    +		assert buffer.remaining() <= CrailConstants.BLOCK_SIZE;
    +
    +		long startingAddress = blockInfo.getAddr() + remoteOffset;
    +		if (startingAddress % getLBADataSize() != 0 ||
    +				((startingAddress + buffer.remaining()) % getLBADataSize() != 0 && op == Operation.WRITE)) {
    +			if (op == Operation.READ) {
    +				throw new IOException("Unaligned read access is not supported. Address (" + startingAddress +
    +						") needs to be multiple of LBA data size " + getLBADataSize());
    +			}
     			try {
    -				switch(op) {
    -					case READ: {
    -						NvmfStorageFuture f = futures[(int)command.getId()] = new NvmfStorageFuture(this, (int)alignedLength);
    -						command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute();
    -						future = new NvmfStorageUnalignedReadFuture(f, this, buffer, remoteMr, remoteOffset, stagingBuffer);
    -						break;
    -					}
    -					case WRITE: {
    -						if (NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0) {
    -							// Do not read if the offset is aligned to sector size
    -							int sizeToWrite = length;
    -							stagingBuffer.put(buffer.getByteBuffer());
    -							stagingBuffer.position(0);
    -							command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).write().execute();
    -							future = futures[(int)command.getId()] = new NvmfStorageUnalignedWriteFuture(this, sizeToWrite, stagingBuffer);
    -						} else {
    -							// RMW but append only file system allows only reading last sector
    -							// and dir entries are sector aligned
    -							stagingBuffer.limit(sectorSize);
    -							NvmfStorageFuture f = futures[(int)command.getId()] = new NvmfStorageFuture(this, sectorSize);
    -							command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute();
    -							future = new NvmfStorageUnalignedRMWFuture(f, this, buffer, remoteMr, remoteOffset, stagingBuffer);
    -						}
    -						break;
    -					}
    -				}
    -			} catch (NoSuchFieldException e) {
    -				throw new IOException(e);
    -			} catch (IllegalAccessException e) {
    +				return new NvmfUnalignedWriteFuture(this, buffer, blockInfo, remoteOffset);
    +			} catch (Exception e) {
     				throw new IOException(e);
     			}
     		}
     
    +		if (!tryGetOperation()) {
    +			do {
    +				poll();
    +			} while (!tryGetOperation());
    +		}
    +
    +		NvmIoCommand<? extends NvmIoCommandCapsule> command;
    +		NvmfFuture<?> future;
    +		Response<NvmResponseCapsule> response;
    +		if (op == Operation.READ) {
    +			NvmReadCommand readCommand = readCommands.remove();
    +			response = readCommand.newResponse();
    +			future = new NvmfFuture<>(this, readCommand, response, readCommands, buffer.remaining());
    +			command = readCommand;
    +		} else {
    +			NvmWriteCommand writeCommand = writeCommands.remove();
    +			response = writeCommand.newResponse();
    +			future = new NvmfFuture<>(this, writeCommand, response, writeCommands, buffer.remaining());
    +			command = writeCommand;
    +		}
    +		command.setCallback(future);
    +		response.setCallback(future);
    +
    +		NvmIoCommandSqe sqe = command.getCommandCapsule().getSubmissionQueueEntry();
    +		long startingLBA = startingAddress / getLBADataSize();
    +		sqe.setStartingLba(startingLBA);
    +		/* TODO: on read this potentially overwrites data beyond the set limit */
    +		short numLogicalBlocks = (short)getNumLogicalBlocks(buffer);
    --- End diff --
    
    Not anymore. Previously the lower level API used shorts (since it is the max supported size). Will change.


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180315936
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Copyright (C) 2018, IBM Corporation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.crail.storage.nvmf.client;
    +
    +import org.apache.crail.CrailBuffer;
    +import org.apache.crail.CrailBufferCache;
    +import org.apache.crail.storage.StorageFuture;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Queue;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +public class NvmfStagingBufferCache {
    +	private final Map<Long, BufferCacheEntry> remoteAddressMap;
    +	private final Queue<CrailBuffer> freeBuffers;
    +	private int buffersLeft;
    +	private final int lbaDataSize;
    +	private final CrailBufferCache bufferCache;
    +
    +	private final CrailBufferCache getBufferCache() {
    +		return bufferCache;
    +	}
    +
    +	NvmfStagingBufferCache(CrailBufferCache bufferCache, int maxEntries, int lbaDataSize) {
    +		if (maxEntries <= 0) {
    +			throw new IllegalArgumentException("maximum entries <= 0");
    +		}
    +		if (lbaDataSize <= 0) {
    +			throw new IllegalArgumentException("LBA data size <= 0");
    +		}
    +		this.remoteAddressMap = new ConcurrentHashMap<>(maxEntries);
    +		this.freeBuffers = new ArrayBlockingQueue<>(maxEntries);
    +		this.buffersLeft = maxEntries;
    +		this.lbaDataSize = lbaDataSize;
    +		this.bufferCache = bufferCache;
    +	}
    +
    +	synchronized void allocateFreeBuffers() throws Exception {
    +		if (!freeBuffers.isEmpty()) {
    +			return;
    +		}
    +		if (buffersLeft == 0) {
    +			/* TODO: make sure this happens rarely */
    +			Iterator<BufferCacheEntry> iterator = remoteAddressMap.values().iterator();
    +			while (iterator.hasNext()) {
    +				BufferCacheEntry currentEntry = iterator.next();
    +				if (currentEntry.tryFree()) {
    +					iterator.remove();
    +					freeBuffers.add(currentEntry.getBuffer());
    +					return;
    +				}
    +			}
    +			throw new OutOfMemoryError();
    +		}
    +
    +		CrailBuffer buffer = getBufferCache().allocateBuffer();
    +		if (buffer == null) {
    +			throw new OutOfMemoryError();
    +		}
    +		if (buffer.capacity() < lbaDataSize) {
    --- End diff --
    
    please print them so that user knows what went wrong. 


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180339324
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java ---
    @@ -31,41 +30,92 @@
     import org.apache.crail.utils.CrailUtils;
     import org.slf4j.Logger;
     
    -import com.ibm.disni.nvmef.NvmeEndpointGroup;
    -import com.ibm.disni.nvmef.spdk.NvmeTransportType;
    +import java.io.IOException;
    +import java.net.UnknownHostException;
    +import java.util.List;
    +import java.util.concurrent.CopyOnWriteArrayList;
    +import java.util.concurrent.TimeUnit;
     
     public class NvmfStorageClient implements StorageClient {
     	private static final Logger LOG = CrailUtils.getLogger();
    -	private static NvmeEndpointGroup clientGroup;
    -	private boolean initialized = false;
    +	private static Nvme nvme;
    +	private boolean initialized;
    +	private volatile boolean closing;
    +	private final Thread keepAliveThread;
    +	private List<NvmfStorageEndpoint> endpoints;
    +	private CrailStatistics statistics;
    +	private CrailBufferCache bufferCache;
    +
    +	public NvmfStorageClient() {
    +		this.initialized = false;
    +		this.endpoints = new CopyOnWriteArrayList<>();
    +		this.closing = false;
    +		this.keepAliveThread = new Thread(() -> {
    +			while (!closing) {
    +				for (NvmfStorageEndpoint endpoint : endpoints) {
    +					try {
    +						endpoint.keepAlive();
    +					} catch (IOException e) {
    +						e.printStackTrace();
    +						return;
    +					}
    +				}
    +				/* We use the default keep alive timer of 120s in jNVMf */
    +				try {
    +					Thread.sleep(TimeUnit.MILLISECONDS.convert(110, TimeUnit.SECONDS));
    +				} catch (InterruptedException e) {
    +					return;
    +				}
    +			}
    +		});
    +	}
    +
    +	boolean isValid() {
    +		return keepAliveThread.isAlive();
    +	}
     
     	public void init(CrailStatistics statistics, CrailBufferCache bufferCache, CrailConfiguration crailConfiguration,
     					 String[] args) throws IOException {
     		if (initialized) {
     			throw new IOException("NvmfStorageTier already initialized");
     		}
     		initialized = true;
    -
    +		this.statistics = statistics;
    +		this.bufferCache = bufferCache;
    +		LOG.info("Initialize Nvmf storage client");
     		NvmfStorageConstants.parseCmdLine(crailConfiguration, args);
    +		keepAliveThread.start();
     	}
     
     	public void printConf(Logger logger) {
     		NvmfStorageConstants.printConf(logger);
     	}
     
    -	public static NvmeEndpointGroup getEndpointGroup() {
    -		if (clientGroup == null) {
    -			clientGroup = new NvmeEndpointGroup(new NvmeTransportType[]{NvmeTransportType.RDMA},
    -					NvmfStorageConstants.CLIENT_MEMPOOL);
    +	public static Nvme getEndpointGroup() throws UnknownHostException {
    --- End diff --
    
    Should not be called from anywhere outside this class, so I will make it private. It is protected in createEndpoint (synchronized)


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180331658
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java ---
    @@ -31,51 +28,68 @@
     
     import java.io.IOException;
     import java.net.InetSocketAddress;
    -import java.net.URI;
    +import java.util.List;
     
     public class NvmfStorageServer implements StorageServer {
     	private static final Logger LOG = CrailUtils.getLogger();
     
     	private boolean isAlive;
     	private long alignedSize;
    -	private long offset;
    +	private long address;
     	private boolean initialized = false;
    -	private NvmeEndpoint endpoint;
    +	private Controller controller;
     
     	public NvmfStorageServer() {}
    -	
    +
     	public void init(CrailConfiguration crailConfiguration, String[] args) throws Exception {
     		if (initialized) {
     			throw new IOException("NvmfStorageTier already initialized");
     		}
     		initialized = true;
     		NvmfStorageConstants.parseCmdLine(crailConfiguration, args);
     
    -		NvmeEndpointGroup group = new NvmeEndpointGroup(new NvmeTransportType[]{NvmeTransportType.RDMA}, NvmfStorageConstants.SERVER_MEMPOOL);
    -		endpoint = group.createEndpoint();
    -
    -		URI uri = new URI("nvmef://" + NvmfStorageConstants.IP_ADDR.getHostAddress() + ":" + NvmfStorageConstants.PORT +
    -					"/0/" + NvmfStorageConstants.NAMESPACE + "?subsystem=" + NvmfStorageConstants.NQN);
    -		endpoint.connect(uri);
    -
    -		long namespaceSize = endpoint.getNamespaceSize();
    +		Nvme nvme = new Nvme();
    +		NvmfTransportId transportId = new NvmfTransportId(
    +				new InetSocketAddress(NvmfStorageConstants.IP_ADDR, NvmfStorageConstants.PORT),
    +				NvmfStorageConstants.NQN);
    +		controller = nvme.connect(transportId);
    +		controller.getControllerConfiguration().setEnable(true);
    +		controller.syncConfiguration();
    +		controller.waitUntilReady();
    +
    +		List<Namespace> namespaces = controller.getActiveNamespaces();
    +		Namespace namespace = null;
    +		for (Namespace n : namespaces) {
    +			if (n.getIdentifier().equals(NvmfStorageConstants.NAMESPACE)) {
    +				namespace = n;
    +				break;
    +			}
    +		}
    +		if (namespace == null) {
    +			throw new IllegalArgumentException("No namespace with id " + NvmfStorageConstants.NAMESPACE +
    +					" at controller " + transportId.toString());
    +		}
    +		IdentifyNamespaceData namespaceData = namespace.getIdentifyNamespaceData();
    +		LbaFormat lbaFormat = namespaceData.getFormattedLbaSize();
    +		int dataSize = lbaFormat.getLbaDataSize().toInt();
    +		long namespaceSize = dataSize * namespaceData.getNamespaceCapacity();
     		alignedSize = namespaceSize - (namespaceSize % NvmfStorageConstants.ALLOCATION_SIZE);
    -		offset = 0;
    +		address = 0;
     
     		isAlive = true;
    -	}	
    +	}
     
     	@Override
     	public void printConf(Logger log) {
    -		NvmfStorageConstants.printConf(log);		
    +		NvmfStorageConstants.printConf(log);
     	}
     
     	public void run() {
     		LOG.info("NnvmfStorageServer started with NVMf target " + getAddress());
     		while (isAlive) {
     			try {
     				Thread.sleep(1000 /* ms */);
    --- End diff --
    
    Let me make the keep alive time a final static to be used in both storage server and client.


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180320774
  
    --- Diff: storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java ---
    @@ -0,0 +1,73 @@
    +package org.apache.crail.storage.nvmf.client;
    +
    +import org.apache.crail.CrailBuffer;
    +import org.apache.crail.CrailBufferCache;
    +import org.apache.crail.conf.CrailConfiguration;
    +import org.apache.crail.conf.CrailConstants;
    +import org.apache.crail.memory.MappedBufferCache;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.HashSet;
    +import java.util.Set;
    +
    +import static org.junit.Assert.*;
    +
    +public class NvmfStagingBufferCacheTest {
    --- End diff --
    
    :+1: 


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180331153
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java ---
    @@ -31,41 +30,92 @@
     import org.apache.crail.utils.CrailUtils;
     import org.slf4j.Logger;
     
    -import com.ibm.disni.nvmef.NvmeEndpointGroup;
    -import com.ibm.disni.nvmef.spdk.NvmeTransportType;
    +import java.io.IOException;
    +import java.net.UnknownHostException;
    +import java.util.List;
    +import java.util.concurrent.CopyOnWriteArrayList;
    +import java.util.concurrent.TimeUnit;
     
     public class NvmfStorageClient implements StorageClient {
     	private static final Logger LOG = CrailUtils.getLogger();
    -	private static NvmeEndpointGroup clientGroup;
    -	private boolean initialized = false;
    +	private static Nvme nvme;
    +	private boolean initialized;
    +	private volatile boolean closing;
    +	private final Thread keepAliveThread;
    +	private List<NvmfStorageEndpoint> endpoints;
    +	private CrailStatistics statistics;
    +	private CrailBufferCache bufferCache;
    +
    +	public NvmfStorageClient() {
    +		this.initialized = false;
    +		this.endpoints = new CopyOnWriteArrayList<>();
    +		this.closing = false;
    +		this.keepAliveThread = new Thread(() -> {
    +			while (!closing) {
    +				for (NvmfStorageEndpoint endpoint : endpoints) {
    +					try {
    +						endpoint.keepAlive();
    +					} catch (IOException e) {
    +						e.printStackTrace();
    +						return;
    +					}
    +				}
    +				/* We use the default keep alive timer of 120s in jNVMf */
    +				try {
    +					Thread.sleep(TimeUnit.MILLISECONDS.convert(110, TimeUnit.SECONDS));
    +				} catch (InterruptedException e) {
    +					return;
    +				}
    +			}
    +		});
    +	}
    +
    +	boolean isValid() {
    --- End diff --
    
    Makes sense. Will change.


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180332615
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Copyright (C) 2018, IBM Corporation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.crail.storage.nvmf.client;
    +
    +import org.apache.crail.CrailBuffer;
    +import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.storage.StorageFuture;
    +import org.apache.crail.storage.StorageResult;
    +
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +
    +public class NvmfUnalignedWriteFuture implements StorageFuture {
    +	private final NvmfStorageEndpoint endpoint;
    +	private StorageFuture beginFuture;
    +	private StorageFuture middleFuture;
    +	private StorageFuture endFuture;
    +	private final int written;
    +	private NvmfStagingBufferCache.BufferCacheEntry beginBuffer;
    +	private NvmfStagingBufferCache.BufferCacheEntry endBuffer;
    +
    +	private final boolean isSectorAligned(long address) {
    +		return address % endpoint.getLBADataSize() == 0;
    +	}
    +
    +	private final long floorToSectorSize(long address) {
    +		return address - (address % endpoint.getLBADataSize());
    +	}
    +
    +	private final int leftInSector(long address) {
    +		return endpoint.getLBADataSize() - offsetInSector(address);
    +	}
    +
    +	private final int offsetInSector(long address) {
    +		return (int)(address % endpoint.getLBADataSize());
    +	}
    +
    +	NvmfUnalignedWriteFuture(NvmfStorageEndpoint endpoint, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws Exception {
    +		this.endpoint = endpoint;
    +		this.written = buffer.remaining();
    +		/* assume blockInfo.getAddr() is sector aligned */
    +		assert isSectorAligned(blockInfo.getAddr());
    +
    +		long nextRemoteOffset = remoteOffset;
    +		/* beginning */
    +		if (!isSectorAligned(remoteOffset)) {
    +			int copySize = Math.min(leftInSector(remoteOffset), buffer.remaining());
    +			nextRemoteOffset = remoteOffset + copySize;
    +			int oldLimit = buffer.limit();
    +			buffer.limit(buffer.position() + copySize);
    +			long alignedRemoteOffset = floorToSectorSize(remoteOffset);
    +			long alignedRemoteAddress = blockInfo.getAddr() + alignedRemoteOffset;
    +			beginBuffer = endpoint.getStagingBufferCache().getExisting(alignedRemoteAddress);
    +			if (beginBuffer == null) {
    +				/* we had to delete the old buffer because we ran out of space. This should happen rarely. */
    +				beginBuffer = endpoint.getStagingBufferCache().get(alignedRemoteAddress);
    +				endpoint.read(beginBuffer.getBuffer(), blockInfo, alignedRemoteOffset).get();
    +			} else {
    +				/* Wait for previous end operation to finish */
    +				beginBuffer.getFuture().get();
    +			}
    +			CrailBuffer stagingBuffer = beginBuffer.getBuffer();
    +			stagingBuffer.position(offsetInSector(remoteOffset));
    +			stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
    +			buffer.limit(oldLimit);
    +			stagingBuffer.position(0);
    +			beginFuture = endpoint.write(stagingBuffer, blockInfo, alignedRemoteOffset);
    +			beginBuffer.setFuture(beginFuture);
    +			stagingBuffer.position(offsetInSector(remoteOffset));
    +		}
    +
    +		/* middle */
    +		if (isSectorAligned(nextRemoteOffset) && buffer.remaining() >= endpoint.getLBADataSize()) {
    +			int oldLimit = buffer.limit();
    +			buffer.limit(buffer.position() + (int)floorToSectorSize(buffer.remaining()));
    --- End diff --
    
    Can you elaborate?


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180331005
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java ---
    @@ -31,41 +30,92 @@
     import org.apache.crail.utils.CrailUtils;
     import org.slf4j.Logger;
     
    -import com.ibm.disni.nvmef.NvmeEndpointGroup;
    -import com.ibm.disni.nvmef.spdk.NvmeTransportType;
    +import java.io.IOException;
    +import java.net.UnknownHostException;
    +import java.util.List;
    +import java.util.concurrent.CopyOnWriteArrayList;
    +import java.util.concurrent.TimeUnit;
     
     public class NvmfStorageClient implements StorageClient {
     	private static final Logger LOG = CrailUtils.getLogger();
    -	private static NvmeEndpointGroup clientGroup;
    -	private boolean initialized = false;
    +	private static Nvme nvme;
    +	private boolean initialized;
    +	private volatile boolean closing;
    +	private final Thread keepAliveThread;
    +	private List<NvmfStorageEndpoint> endpoints;
    +	private CrailStatistics statistics;
    +	private CrailBufferCache bufferCache;
    +
    +	public NvmfStorageClient() {
    +		this.initialized = false;
    +		this.endpoints = new CopyOnWriteArrayList<>();
    +		this.closing = false;
    +		this.keepAliveThread = new Thread(() -> {
    +			while (!closing) {
    +				for (NvmfStorageEndpoint endpoint : endpoints) {
    +					try {
    +						endpoint.keepAlive();
    +					} catch (IOException e) {
    +						e.printStackTrace();
    +						return;
    +					}
    +				}
    +				/* We use the default keep alive timer of 120s in jNVMf */
    +				try {
    +					Thread.sleep(TimeUnit.MILLISECONDS.convert(110, TimeUnit.SECONDS));
    --- End diff --
    
    For now the keep alive timer is not configurable. 110 because we don't want to be late, if we wait until 120s the timer might have passed especially since we have to send keep alive messages to all connected endpoints.


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180317997
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java ---
    @@ -19,208 +19,229 @@
     
     package org.apache.crail.storage.nvmf.client;
     
    -import com.ibm.disni.nvmef.NvmeCommand;
    -import com.ibm.disni.nvmef.NvmeEndpoint;
    -import com.ibm.disni.nvmef.NvmeEndpointGroup;
    -import com.ibm.disni.nvmef.spdk.IOCompletion;
    -
    +import com.ibm.jnvmf.*;
     import org.apache.crail.CrailBuffer;
    +import org.apache.crail.CrailBufferCache;
    +import org.apache.crail.CrailStatistics;
     import org.apache.crail.conf.CrailConstants;
    -import org.apache.crail.memory.BufferCache;
     import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.metadata.DataNodeInfo;
     import org.apache.crail.storage.StorageEndpoint;
     import org.apache.crail.storage.StorageFuture;
    -import org.apache.crail.storage.nvmf.NvmfBufferCache;
     import org.apache.crail.storage.nvmf.NvmfStorageConstants;
     import org.apache.crail.utils.CrailUtils;
     import org.slf4j.Logger;
     
     import java.io.IOException;
    +import java.net.InetAddress;
     import java.net.InetSocketAddress;
    -import java.net.URI;
    -import java.net.URISyntaxException;
    -import java.util.concurrent.*;
    +import java.util.List;
    +import java.util.Queue;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.TimeoutException;
    +import java.util.concurrent.atomic.AtomicInteger;
     
     public class NvmfStorageEndpoint implements StorageEndpoint {
     	private static final Logger LOG = CrailUtils.getLogger();
     
    -	private final InetSocketAddress inetSocketAddress;
    -	private final NvmeEndpoint endpoint;
    -	private final int sectorSize;
    -	private final BufferCache cache;
    -	private final BlockingQueue<NvmeCommand> freeCommands;
    -	private final NvmeCommand[] commands;
    -	private final NvmfStorageFuture[] futures;
    -	private final ThreadLocal<long[]> completed;
    -	private final int ioQeueueSize;
    -
    -	public NvmfStorageEndpoint(NvmeEndpointGroup group, InetSocketAddress inetSocketAddress) throws IOException {
    -		this.inetSocketAddress = inetSocketAddress;
    -		endpoint = group.createEndpoint();
    +	private final Controller controller;
    +	private final IoQueuePair queuePair;
    +	private final int lbaDataSize;
    +	private final long namespaceCapacity;
    +	private final NvmfRegisteredBufferCache registeredBufferCache;
    +	private final NvmfStagingBufferCache stagingBufferCache;
    +	private final CrailStatistics statistics;
    +
    +	private final Queue<NvmWriteCommand> writeCommands;
    +	private final Queue<NvmReadCommand> readCommands;
    +
    +	private final AtomicInteger outstandingOperations;
    +
    +	public NvmfStorageEndpoint(Nvme nvme, DataNodeInfo info, CrailStatistics statistics,
    +							   CrailBufferCache bufferCache) throws IOException {
    +		InetSocketAddress inetSocketAddress = new InetSocketAddress(
    +				InetAddress.getByAddress(info.getIpAddress()), info.getPort());
    +		// XXX FIXME: nsid from datanodeinfo
    +		NvmfTransportId transportId = new NvmfTransportId(inetSocketAddress,
    +				new NvmeQualifiedName(NvmfStorageConstants.NQN.toString() + info.getPort()));
    +		LOG.info("Connecting to NVMf target at " + transportId.toString());
    +		controller = nvme.connect(transportId);
    +		controller.getControllerConfiguration().setEnable(true);
    +		controller.syncConfiguration();
     		try {
    -			URI url = new URI("nvmef://" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() +
    -					"/0/" + NvmfStorageConstants.NAMESPACE + "?subsystem=nqn.2016-06.io.spdk:cnode1");
    -			LOG.info("Connecting to " + url.toString());
    -			endpoint.connect(url);
    -		} catch (URISyntaxException e) {
    -			//FIXME
    -			e.printStackTrace();
    +			controller.waitUntilReady();
    +		} catch (TimeoutException e) {
    +			throw new IOException(e);
     		}
    -		sectorSize = endpoint.getSectorSize();
    -		cache = new NvmfBufferCache();
    -		ioQeueueSize = endpoint.getIOQueueSize();
    -		freeCommands = new ArrayBlockingQueue<NvmeCommand>(ioQeueueSize);
    -		commands = new NvmeCommand[ioQeueueSize];
    -		for (int i = 0; i < ioQeueueSize; i++) {
    -			NvmeCommand command = endpoint.newCommand();
    -			command.setId(i);
    -			commands[i] = command;
    -			freeCommands.add(command);
    +		IdentifyControllerData identifyControllerData = controller.getIdentifyControllerData();
    +		if (CrailConstants.SLICE_SIZE > identifyControllerData.getMaximumDataTransferSize().toInt()) {
    +			throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + " > max transfer size (" +
    +					identifyControllerData.getMaximumDataTransferSize() + ")");
     		}
    -		futures = new NvmfStorageFuture[ioQeueueSize];
    -		completed = new ThreadLocal<long[]>() {
    -			public long[] initialValue() {
    -				return new long[ioQeueueSize];
    +		List<Namespace> namespaces = controller.getActiveNamespaces();
    +		//TODO: poll nsid in datanodeinfo
    +		NamespaceIdentifier namespaceIdentifier = new NamespaceIdentifier(1);
    +		Namespace namespace = null;
    +		for (Namespace n : namespaces) {
    +			if (n.getIdentifier().equals(namespaceIdentifier)) {
    +				namespace = n;
    +				break;
     			}
    -		};
    +		}
    +		if (namespace == null) {
    +			throw new IllegalArgumentException("No namespace with id " + namespaceIdentifier +
    +					" at controller " + transportId.toString());
    +		}
    +		IdentifyNamespaceData identifyNamespaceData = namespace.getIdentifyNamespaceData();
    +		lbaDataSize = identifyNamespaceData.getFormattedLbaSize().getLbaDataSize().toInt();
    +		if (CrailConstants.SLICE_SIZE % lbaDataSize != 0) {
    +			throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY +
    +					" is not a multiple of LBA data size (" + lbaDataSize + ")");
    +		}
    +		namespaceCapacity = identifyNamespaceData.getNamespaceCapacity() * lbaDataSize;
    +		this.queuePair = controller.createIoQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0,
    +				SubmissionQueueEntry.SIZE);
    +
    +		this.writeCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
    +		this.readCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
    +		for(int i = 0; i < NvmfStorageConstants.QUEUE_SIZE; i++) {
    +			NvmWriteCommand writeCommand = new NvmWriteCommand(queuePair);
    +			writeCommand.setSendInline(true);
    +			writeCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
    +			writeCommands.add(writeCommand);
    +			NvmReadCommand readCommand = new NvmReadCommand(queuePair);
    +			readCommand.setSendInline(true);
    +			readCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
    +			readCommands.add(readCommand);
    +		}
    +		this.registeredBufferCache = new NvmfRegisteredBufferCache(queuePair);
    +		this.outstandingOperations = new AtomicInteger(0);
    +		this.stagingBufferCache = new NvmfStagingBufferCache(bufferCache,
    +				NvmfStorageConstants.STAGING_CACHE_SIZE, getLBADataSize());
    +		this.statistics = statistics;
    +	}
    +
    +	public void keepAlive() throws IOException {
    +		controller.keepAlive();
    +	}
    +
    +	public int getLBADataSize() {
    +		return lbaDataSize;
     	}
     
    -	public int getSectorSize() {
    -		return sectorSize;
    +	public long getNamespaceCapacity() {
    +		return namespaceCapacity;
     	}
     
     	enum Operation {
     		WRITE,
    -		READ;
    +		READ
     	}
     
    -	public StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset)
    -			throws IOException, InterruptedException {
    -		int length = buffer.remaining();
    -		if (length > CrailConstants.BLOCK_SIZE){
    -			throw new IOException("write size too large " + length);
    -		}
    -		if (length <= 0){
    -			throw new IOException("write size too small, len " + length);
    -		}
    -		if (buffer.position() < 0){
    -			throw new IOException("local offset too small " + buffer.position());
    -		}
    -		if (remoteOffset < 0){
    -			throw new IOException("remote offset too small " + remoteOffset);
    -		}
    +	void putOperation() {
    +		outstandingOperations.decrementAndGet();
    +	}
     
    -		if (remoteMr.getAddr() + remoteOffset + length > endpoint.getNamespaceSize()){
    -			long tmpAddr = remoteMr.getAddr() + remoteOffset + length;
    -			throw new IOException("remote fileOffset + remoteOffset + len = " + tmpAddr + " - size = " +
    -					endpoint.getNamespaceSize());
    +	private boolean tryGetOperation() {
    +		int outstandingOperationsOld = outstandingOperations.get();
    +		if (outstandingOperationsOld < NvmfStorageConstants.QUEUE_SIZE) {
    +			return outstandingOperations.compareAndSet(outstandingOperationsOld, outstandingOperationsOld + 1);
     		}
    +		return false;
    +	}
     
    -//		LOG.info("op = " + op.name() +
    -//				", position = " + buffer.position() +
    -//				", localOffset = " + buffer.position() +
    -//				", remoteOffset = " + remoteOffset +
    -//				", remoteAddr = " + remoteMr.getAddr() +
    -//				", length = " + length);
    -
    -		NvmeCommand command = freeCommands.poll();
    -		while (command == null) {
    -			poll();
    -			command = freeCommands.poll();
    -		}
    +	private static int divCeil(int a, int b) {
    +		return (a + b - 1) / b;
    +	}
     
    -		boolean aligned = NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0
    -				&& NvmfStorageUtils.namespaceSectorOffset(sectorSize, length) == 0;
    -		long lba = NvmfStorageUtils.linearBlockAddress(remoteMr, remoteOffset, sectorSize);
    -		StorageFuture future = null;
    -		if (aligned) {
    -//			LOG.info("aligned");
    -			command.setBuffer(buffer.getByteBuffer()).setLinearBlockAddress(lba);
    -			switch(op) {
    -				case READ:
    -					command.read();
    -					break;
    -				case WRITE:
    -					command.write();
    -					break;
    -			}
    -			future = futures[(int)command.getId()] = new NvmfStorageFuture(this, length);
    -			command.execute();
    -		} else {
    -//			LOG.info("unaligned");
    -			long alignedLength = NvmfStorageUtils.alignLength(sectorSize, remoteOffset, length);
    +	private int getNumLogicalBlocks(CrailBuffer buffer) {
    +		return divCeil(buffer.remaining(), getLBADataSize());
    +	}
     
    -			CrailBuffer stagingBuffer = cache.allocateBuffer();
    -			stagingBuffer.limit((int)alignedLength);
    +	StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws InterruptedException, IOException {
    +		assert blockInfo.getAddr() + remoteOffset + buffer.remaining() <= getNamespaceCapacity();
    +		assert remoteOffset >= 0;
    +		assert buffer.remaining() <= CrailConstants.BLOCK_SIZE;
    +
    +		long startingAddress = blockInfo.getAddr() + remoteOffset;
    +		if (startingAddress % getLBADataSize() != 0 ||
    +				((startingAddress + buffer.remaining()) % getLBADataSize() != 0 && op == Operation.WRITE)) {
    +			if (op == Operation.READ) {
    +				throw new IOException("Unaligned read access is not supported. Address (" + startingAddress +
    +						") needs to be multiple of LBA data size " + getLBADataSize());
    +			}
     			try {
    -				switch(op) {
    -					case READ: {
    -						NvmfStorageFuture f = futures[(int)command.getId()] = new NvmfStorageFuture(this, (int)alignedLength);
    -						command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute();
    -						future = new NvmfStorageUnalignedReadFuture(f, this, buffer, remoteMr, remoteOffset, stagingBuffer);
    -						break;
    -					}
    -					case WRITE: {
    -						if (NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0) {
    -							// Do not read if the offset is aligned to sector size
    -							int sizeToWrite = length;
    -							stagingBuffer.put(buffer.getByteBuffer());
    -							stagingBuffer.position(0);
    -							command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).write().execute();
    -							future = futures[(int)command.getId()] = new NvmfStorageUnalignedWriteFuture(this, sizeToWrite, stagingBuffer);
    -						} else {
    -							// RMW but append only file system allows only reading last sector
    -							// and dir entries are sector aligned
    -							stagingBuffer.limit(sectorSize);
    -							NvmfStorageFuture f = futures[(int)command.getId()] = new NvmfStorageFuture(this, sectorSize);
    -							command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute();
    -							future = new NvmfStorageUnalignedRMWFuture(f, this, buffer, remoteMr, remoteOffset, stagingBuffer);
    -						}
    -						break;
    -					}
    -				}
    -			} catch (NoSuchFieldException e) {
    -				throw new IOException(e);
    -			} catch (IllegalAccessException e) {
    +				return new NvmfUnalignedWriteFuture(this, buffer, blockInfo, remoteOffset);
    +			} catch (Exception e) {
     				throw new IOException(e);
     			}
     		}
     
    +		if (!tryGetOperation()) {
    +			do {
    +				poll();
    +			} while (!tryGetOperation());
    +		}
    +
    +		NvmIoCommand<? extends NvmIoCommandCapsule> command;
    +		NvmfFuture<?> future;
    +		Response<NvmResponseCapsule> response;
    +		if (op == Operation.READ) {
    +			NvmReadCommand readCommand = readCommands.remove();
    +			response = readCommand.newResponse();
    +			future = new NvmfFuture<>(this, readCommand, response, readCommands, buffer.remaining());
    +			command = readCommand;
    +		} else {
    +			NvmWriteCommand writeCommand = writeCommands.remove();
    +			response = writeCommand.newResponse();
    +			future = new NvmfFuture<>(this, writeCommand, response, writeCommands, buffer.remaining());
    +			command = writeCommand;
    +		}
    +		command.setCallback(future);
    +		response.setCallback(future);
    +
    +		NvmIoCommandSqe sqe = command.getCommandCapsule().getSubmissionQueueEntry();
    +		long startingLBA = startingAddress / getLBADataSize();
    +		sqe.setStartingLba(startingLBA);
    +		/* TODO: on read this potentially overwrites data beyond the set limit */
    +		short numLogicalBlocks = (short)getNumLogicalBlocks(buffer);
    --- End diff --
    
    Any particular reason this is short? than an integer?


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180337304
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java ---
    @@ -86,15 +100,15 @@ public void run() {
     	@Override
     	public StorageResource allocateResource() throws Exception {
     		StorageResource resource = null;
    -		
    +
     		if (alignedSize > 0){
     			LOG.info("new block, length " + NvmfStorageConstants.ALLOCATION_SIZE);
    -			LOG.debug("block stag 0, offset " + offset + ", length " + NvmfStorageConstants.ALLOCATION_SIZE);
    +			LOG.debug("block stag 0, address " + address + ", length " + NvmfStorageConstants.ALLOCATION_SIZE);
     			alignedSize -= NvmfStorageConstants.ALLOCATION_SIZE;
    -			resource = StorageResource.createResource(offset, (int)NvmfStorageConstants.ALLOCATION_SIZE, 0);
    -			offset += NvmfStorageConstants.ALLOCATION_SIZE;
    +			resource = StorageResource.createResource(address, (int)NvmfStorageConstants.ALLOCATION_SIZE, 0);
    --- End diff --
    
    Good point. Not sure why I made them long in the first place.


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180315522
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Copyright (C) 2018, IBM Corporation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.crail.storage.nvmf.client;
    +
    +import org.apache.crail.CrailBuffer;
    +import org.apache.crail.CrailBufferCache;
    +import org.apache.crail.storage.StorageFuture;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Queue;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +public class NvmfStagingBufferCache {
    +	private final Map<Long, BufferCacheEntry> remoteAddressMap;
    +	private final Queue<CrailBuffer> freeBuffers;
    +	private int buffersLeft;
    +	private final int lbaDataSize;
    +	private final CrailBufferCache bufferCache;
    +
    +	private final CrailBufferCache getBufferCache() {
    +		return bufferCache;
    +	}
    +
    +	NvmfStagingBufferCache(CrailBufferCache bufferCache, int maxEntries, int lbaDataSize) {
    +		if (maxEntries <= 0) {
    --- End diff --
    
    Here as well, print the value that caused the error.


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by PepperJo <gi...@git.apache.org>.
Github user PepperJo commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180332561
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Copyright (C) 2018, IBM Corporation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.crail.storage.nvmf.client;
    +
    +import org.apache.crail.CrailBuffer;
    +import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.storage.StorageFuture;
    +import org.apache.crail.storage.StorageResult;
    +
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +
    +public class NvmfUnalignedWriteFuture implements StorageFuture {
    +	private final NvmfStorageEndpoint endpoint;
    +	private StorageFuture beginFuture;
    +	private StorageFuture middleFuture;
    +	private StorageFuture endFuture;
    +	private final int written;
    +	private NvmfStagingBufferCache.BufferCacheEntry beginBuffer;
    +	private NvmfStagingBufferCache.BufferCacheEntry endBuffer;
    +
    +	private final boolean isSectorAligned(long address) {
    +		return address % endpoint.getLBADataSize() == 0;
    +	}
    +
    +	private final long floorToSectorSize(long address) {
    +		return address - (address % endpoint.getLBADataSize());
    +	}
    +
    +	private final int leftInSector(long address) {
    +		return endpoint.getLBADataSize() - offsetInSector(address);
    +	}
    +
    +	private final int offsetInSector(long address) {
    +		return (int)(address % endpoint.getLBADataSize());
    +	}
    +
    +	NvmfUnalignedWriteFuture(NvmfStorageEndpoint endpoint, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws Exception {
    +		this.endpoint = endpoint;
    +		this.written = buffer.remaining();
    +		/* assume blockInfo.getAddr() is sector aligned */
    +		assert isSectorAligned(blockInfo.getAddr());
    +
    +		long nextRemoteOffset = remoteOffset;
    +		/* beginning */
    +		if (!isSectorAligned(remoteOffset)) {
    +			int copySize = Math.min(leftInSector(remoteOffset), buffer.remaining());
    +			nextRemoteOffset = remoteOffset + copySize;
    +			int oldLimit = buffer.limit();
    +			buffer.limit(buffer.position() + copySize);
    +			long alignedRemoteOffset = floorToSectorSize(remoteOffset);
    +			long alignedRemoteAddress = blockInfo.getAddr() + alignedRemoteOffset;
    +			beginBuffer = endpoint.getStagingBufferCache().getExisting(alignedRemoteAddress);
    +			if (beginBuffer == null) {
    +				/* we had to delete the old buffer because we ran out of space. This should happen rarely. */
    +				beginBuffer = endpoint.getStagingBufferCache().get(alignedRemoteAddress);
    +				endpoint.read(beginBuffer.getBuffer(), blockInfo, alignedRemoteOffset).get();
    +			} else {
    +				/* Wait for previous end operation to finish */
    +				beginBuffer.getFuture().get();
    +			}
    +			CrailBuffer stagingBuffer = beginBuffer.getBuffer();
    +			stagingBuffer.position(offsetInSector(remoteOffset));
    +			stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
    +			buffer.limit(oldLimit);
    +			stagingBuffer.position(0);
    +			beginFuture = endpoint.write(stagingBuffer, blockInfo, alignedRemoteOffset);
    +			beginBuffer.setFuture(beginFuture);
    +			stagingBuffer.position(offsetInSector(remoteOffset));
    +		}
    +
    +		/* middle */
    +		if (isSectorAligned(nextRemoteOffset) && buffer.remaining() >= endpoint.getLBADataSize()) {
    +			int oldLimit = buffer.limit();
    +			buffer.limit(buffer.position() + (int)floorToSectorSize(buffer.remaining()));
    +			int toWrite = buffer.remaining();
    +			middleFuture = endpoint.write(buffer, blockInfo, nextRemoteOffset);
    +			nextRemoteOffset += toWrite;
    +			buffer.position(buffer.limit());
    +			buffer.limit(oldLimit);
    +		}
    +
    +		/* end */
    +		if (buffer.remaining() > 0) {
    +			endBuffer = endpoint.getStagingBufferCache().get(blockInfo.getAddr() + nextRemoteOffset);
    +			CrailBuffer stagingBuffer = endBuffer.getBuffer();
    +			stagingBuffer.position(0);
    +			stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
    +			stagingBuffer.position(0);
    +			endFuture = endpoint.write(stagingBuffer, blockInfo, nextRemoteOffset);
    +			endBuffer.setFuture(endFuture);
    +		}
    +	}
    +
    +	@Override
    +	public boolean isSynchronous() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean cancel(boolean b) {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isCancelled() {
    +		return false;
    +	}
    +
    +	private static boolean checkIfFutureIsDone(StorageFuture future) {
    --- End diff --
    
    Yes. Any one of begin, middle and end can be null take a look at the constructor. They are only executed if it makes sense.


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180314572
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java ---
    @@ -86,15 +100,15 @@ public void run() {
     	@Override
     	public StorageResource allocateResource() throws Exception {
     		StorageResource resource = null;
    -		
    +
     		if (alignedSize > 0){
     			LOG.info("new block, length " + NvmfStorageConstants.ALLOCATION_SIZE);
    -			LOG.debug("block stag 0, offset " + offset + ", length " + NvmfStorageConstants.ALLOCATION_SIZE);
    +			LOG.debug("block stag 0, address " + address + ", length " + NvmfStorageConstants.ALLOCATION_SIZE);
     			alignedSize -= NvmfStorageConstants.ALLOCATION_SIZE;
    -			resource = StorageResource.createResource(offset, (int)NvmfStorageConstants.ALLOCATION_SIZE, 0);
    -			offset += NvmfStorageConstants.ALLOCATION_SIZE;
    +			resource = StorageResource.createResource(address, (int)NvmfStorageConstants.ALLOCATION_SIZE, 0);
    --- End diff --
    
    Same goes for other NvmfStorageConstants constants as well. 


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180314443
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java ---
    @@ -86,15 +100,15 @@ public void run() {
     	@Override
     	public StorageResource allocateResource() throws Exception {
     		StorageResource resource = null;
    -		
    +
     		if (alignedSize > 0){
     			LOG.info("new block, length " + NvmfStorageConstants.ALLOCATION_SIZE);
    -			LOG.debug("block stag 0, offset " + offset + ", length " + NvmfStorageConstants.ALLOCATION_SIZE);
    +			LOG.debug("block stag 0, address " + address + ", length " + NvmfStorageConstants.ALLOCATION_SIZE);
     			alignedSize -= NvmfStorageConstants.ALLOCATION_SIZE;
    -			resource = StorageResource.createResource(offset, (int)NvmfStorageConstants.ALLOCATION_SIZE, 0);
    -			offset += NvmfStorageConstants.ALLOCATION_SIZE;
    +			resource = StorageResource.createResource(address, (int)NvmfStorageConstants.ALLOCATION_SIZE, 0);
    --- End diff --
    
    #sad. Why not make ALLOCATION_SIZE an integer to start with. It is a long which is bad because a user might be tempted to give it a > 2GB values considering it is a long. 


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-crail/pull/16


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180313400
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java ---
    @@ -89,31 +76,31 @@ public static void updateConstants(CrailConfiguration conf) throws UnknownHostEx
     
     		arg = get(conf, NQN_KEY);
     		if (arg != null) {
    -			NQN = arg;
    +			NQN = new NvmeQualifiedName(arg);
     		}
     
     		arg = get(conf, ALLOCATION_SIZE_KEY);
     		if (arg != null) {
     			ALLOCATION_SIZE = Long.parseLong(arg);
     		}
     
    -		arg = get(conf, SERVER_MEMPOOL_KEY);
    +		arg = get(conf, QUEUE_SIZE_KEY);
     		if (arg != null) {
    -			SERVER_MEMPOOL = Long.parseLong(arg);
    +			QUEUE_SIZE = Integer.parseInt(arg);
     		}
     
    -		arg = get(conf, CLIENT_MEMPOOL_KEY);
    +		arg = get(conf, STAGING_CACHE_SIZE_KEY);
     		if (arg != null) {
    -			CLIENT_MEMPOOL = Long.parseLong(arg);
    +			STAGING_CACHE_SIZE = Integer.parseInt(arg);
     		}
     	}
     
    -	public static void verify() throws IOException {
    -		if (NAMESPACE <= 0){
    -			throw new IOException("Namespace must be > 0");
    -		}
    +	public static void verify() {
     		if (ALLOCATION_SIZE % CrailConstants.BLOCK_SIZE != 0){
    -			throw new IOException("allocationsize must be multiple of crail.blocksize");
    +			throw new IllegalArgumentException("allocationsize must be multiple of crail.blocksize");
    --- End diff --
    
    Usually I would recommend to print as much useful information possible in exceptions. Can we print what was the crail.blocksize and allocation size that we got here? 


---

[GitHub] incubator-crail pull request #16: New NVMf storage based on jNVMf library

Posted by animeshtrivedi <gi...@git.apache.org>.
Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180317810
  
    --- Diff: storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java ---
    @@ -19,208 +19,229 @@
     
     package org.apache.crail.storage.nvmf.client;
     
    -import com.ibm.disni.nvmef.NvmeCommand;
    -import com.ibm.disni.nvmef.NvmeEndpoint;
    -import com.ibm.disni.nvmef.NvmeEndpointGroup;
    -import com.ibm.disni.nvmef.spdk.IOCompletion;
    -
    +import com.ibm.jnvmf.*;
     import org.apache.crail.CrailBuffer;
    +import org.apache.crail.CrailBufferCache;
    +import org.apache.crail.CrailStatistics;
     import org.apache.crail.conf.CrailConstants;
    -import org.apache.crail.memory.BufferCache;
     import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.metadata.DataNodeInfo;
     import org.apache.crail.storage.StorageEndpoint;
     import org.apache.crail.storage.StorageFuture;
    -import org.apache.crail.storage.nvmf.NvmfBufferCache;
     import org.apache.crail.storage.nvmf.NvmfStorageConstants;
     import org.apache.crail.utils.CrailUtils;
     import org.slf4j.Logger;
     
     import java.io.IOException;
    +import java.net.InetAddress;
     import java.net.InetSocketAddress;
    -import java.net.URI;
    -import java.net.URISyntaxException;
    -import java.util.concurrent.*;
    +import java.util.List;
    +import java.util.Queue;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.TimeoutException;
    +import java.util.concurrent.atomic.AtomicInteger;
     
     public class NvmfStorageEndpoint implements StorageEndpoint {
     	private static final Logger LOG = CrailUtils.getLogger();
     
    -	private final InetSocketAddress inetSocketAddress;
    -	private final NvmeEndpoint endpoint;
    -	private final int sectorSize;
    -	private final BufferCache cache;
    -	private final BlockingQueue<NvmeCommand> freeCommands;
    -	private final NvmeCommand[] commands;
    -	private final NvmfStorageFuture[] futures;
    -	private final ThreadLocal<long[]> completed;
    -	private final int ioQeueueSize;
    -
    -	public NvmfStorageEndpoint(NvmeEndpointGroup group, InetSocketAddress inetSocketAddress) throws IOException {
    -		this.inetSocketAddress = inetSocketAddress;
    -		endpoint = group.createEndpoint();
    +	private final Controller controller;
    +	private final IoQueuePair queuePair;
    +	private final int lbaDataSize;
    +	private final long namespaceCapacity;
    +	private final NvmfRegisteredBufferCache registeredBufferCache;
    +	private final NvmfStagingBufferCache stagingBufferCache;
    +	private final CrailStatistics statistics;
    +
    +	private final Queue<NvmWriteCommand> writeCommands;
    +	private final Queue<NvmReadCommand> readCommands;
    +
    +	private final AtomicInteger outstandingOperations;
    +
    +	public NvmfStorageEndpoint(Nvme nvme, DataNodeInfo info, CrailStatistics statistics,
    +							   CrailBufferCache bufferCache) throws IOException {
    +		InetSocketAddress inetSocketAddress = new InetSocketAddress(
    +				InetAddress.getByAddress(info.getIpAddress()), info.getPort());
    +		// XXX FIXME: nsid from datanodeinfo
    +		NvmfTransportId transportId = new NvmfTransportId(inetSocketAddress,
    +				new NvmeQualifiedName(NvmfStorageConstants.NQN.toString() + info.getPort()));
    +		LOG.info("Connecting to NVMf target at " + transportId.toString());
    +		controller = nvme.connect(transportId);
    +		controller.getControllerConfiguration().setEnable(true);
    +		controller.syncConfiguration();
     		try {
    -			URI url = new URI("nvmef://" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() +
    -					"/0/" + NvmfStorageConstants.NAMESPACE + "?subsystem=nqn.2016-06.io.spdk:cnode1");
    -			LOG.info("Connecting to " + url.toString());
    -			endpoint.connect(url);
    -		} catch (URISyntaxException e) {
    -			//FIXME
    -			e.printStackTrace();
    +			controller.waitUntilReady();
    +		} catch (TimeoutException e) {
    +			throw new IOException(e);
     		}
    -		sectorSize = endpoint.getSectorSize();
    -		cache = new NvmfBufferCache();
    -		ioQeueueSize = endpoint.getIOQueueSize();
    -		freeCommands = new ArrayBlockingQueue<NvmeCommand>(ioQeueueSize);
    -		commands = new NvmeCommand[ioQeueueSize];
    -		for (int i = 0; i < ioQeueueSize; i++) {
    -			NvmeCommand command = endpoint.newCommand();
    -			command.setId(i);
    -			commands[i] = command;
    -			freeCommands.add(command);
    +		IdentifyControllerData identifyControllerData = controller.getIdentifyControllerData();
    +		if (CrailConstants.SLICE_SIZE > identifyControllerData.getMaximumDataTransferSize().toInt()) {
    +			throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + " > max transfer size (" +
    +					identifyControllerData.getMaximumDataTransferSize() + ")");
     		}
    -		futures = new NvmfStorageFuture[ioQeueueSize];
    -		completed = new ThreadLocal<long[]>() {
    -			public long[] initialValue() {
    -				return new long[ioQeueueSize];
    +		List<Namespace> namespaces = controller.getActiveNamespaces();
    +		//TODO: poll nsid in datanodeinfo
    +		NamespaceIdentifier namespaceIdentifier = new NamespaceIdentifier(1);
    +		Namespace namespace = null;
    +		for (Namespace n : namespaces) {
    +			if (n.getIdentifier().equals(namespaceIdentifier)) {
    +				namespace = n;
    +				break;
     			}
    -		};
    +		}
    +		if (namespace == null) {
    +			throw new IllegalArgumentException("No namespace with id " + namespaceIdentifier +
    +					" at controller " + transportId.toString());
    +		}
    +		IdentifyNamespaceData identifyNamespaceData = namespace.getIdentifyNamespaceData();
    +		lbaDataSize = identifyNamespaceData.getFormattedLbaSize().getLbaDataSize().toInt();
    +		if (CrailConstants.SLICE_SIZE % lbaDataSize != 0) {
    +			throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY +
    +					" is not a multiple of LBA data size (" + lbaDataSize + ")");
    +		}
    +		namespaceCapacity = identifyNamespaceData.getNamespaceCapacity() * lbaDataSize;
    +		this.queuePair = controller.createIoQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0,
    +				SubmissionQueueEntry.SIZE);
    +
    +		this.writeCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
    +		this.readCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
    +		for(int i = 0; i < NvmfStorageConstants.QUEUE_SIZE; i++) {
    +			NvmWriteCommand writeCommand = new NvmWriteCommand(queuePair);
    +			writeCommand.setSendInline(true);
    +			writeCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
    +			writeCommands.add(writeCommand);
    +			NvmReadCommand readCommand = new NvmReadCommand(queuePair);
    +			readCommand.setSendInline(true);
    +			readCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
    +			readCommands.add(readCommand);
    +		}
    +		this.registeredBufferCache = new NvmfRegisteredBufferCache(queuePair);
    +		this.outstandingOperations = new AtomicInteger(0);
    +		this.stagingBufferCache = new NvmfStagingBufferCache(bufferCache,
    +				NvmfStorageConstants.STAGING_CACHE_SIZE, getLBADataSize());
    +		this.statistics = statistics;
    +	}
    +
    +	public void keepAlive() throws IOException {
    +		controller.keepAlive();
    +	}
    +
    +	public int getLBADataSize() {
    +		return lbaDataSize;
     	}
     
    -	public int getSectorSize() {
    -		return sectorSize;
    +	public long getNamespaceCapacity() {
    +		return namespaceCapacity;
     	}
     
     	enum Operation {
     		WRITE,
    -		READ;
    +		READ
     	}
     
    -	public StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset)
    -			throws IOException, InterruptedException {
    -		int length = buffer.remaining();
    -		if (length > CrailConstants.BLOCK_SIZE){
    -			throw new IOException("write size too large " + length);
    -		}
    -		if (length <= 0){
    -			throw new IOException("write size too small, len " + length);
    -		}
    -		if (buffer.position() < 0){
    -			throw new IOException("local offset too small " + buffer.position());
    -		}
    -		if (remoteOffset < 0){
    -			throw new IOException("remote offset too small " + remoteOffset);
    -		}
    +	void putOperation() {
    +		outstandingOperations.decrementAndGet();
    +	}
     
    -		if (remoteMr.getAddr() + remoteOffset + length > endpoint.getNamespaceSize()){
    -			long tmpAddr = remoteMr.getAddr() + remoteOffset + length;
    -			throw new IOException("remote fileOffset + remoteOffset + len = " + tmpAddr + " - size = " +
    -					endpoint.getNamespaceSize());
    +	private boolean tryGetOperation() {
    +		int outstandingOperationsOld = outstandingOperations.get();
    +		if (outstandingOperationsOld < NvmfStorageConstants.QUEUE_SIZE) {
    +			return outstandingOperations.compareAndSet(outstandingOperationsOld, outstandingOperationsOld + 1);
     		}
    +		return false;
    +	}
     
    -//		LOG.info("op = " + op.name() +
    -//				", position = " + buffer.position() +
    -//				", localOffset = " + buffer.position() +
    -//				", remoteOffset = " + remoteOffset +
    -//				", remoteAddr = " + remoteMr.getAddr() +
    -//				", length = " + length);
    -
    -		NvmeCommand command = freeCommands.poll();
    -		while (command == null) {
    -			poll();
    -			command = freeCommands.poll();
    -		}
    +	private static int divCeil(int a, int b) {
    +		return (a + b - 1) / b;
    +	}
     
    -		boolean aligned = NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0
    -				&& NvmfStorageUtils.namespaceSectorOffset(sectorSize, length) == 0;
    -		long lba = NvmfStorageUtils.linearBlockAddress(remoteMr, remoteOffset, sectorSize);
    -		StorageFuture future = null;
    -		if (aligned) {
    -//			LOG.info("aligned");
    -			command.setBuffer(buffer.getByteBuffer()).setLinearBlockAddress(lba);
    -			switch(op) {
    -				case READ:
    -					command.read();
    -					break;
    -				case WRITE:
    -					command.write();
    -					break;
    -			}
    -			future = futures[(int)command.getId()] = new NvmfStorageFuture(this, length);
    -			command.execute();
    -		} else {
    -//			LOG.info("unaligned");
    -			long alignedLength = NvmfStorageUtils.alignLength(sectorSize, remoteOffset, length);
    +	private int getNumLogicalBlocks(CrailBuffer buffer) {
    +		return divCeil(buffer.remaining(), getLBADataSize());
    +	}
     
    -			CrailBuffer stagingBuffer = cache.allocateBuffer();
    -			stagingBuffer.limit((int)alignedLength);
    +	StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws InterruptedException, IOException {
    +		assert blockInfo.getAddr() + remoteOffset + buffer.remaining() <= getNamespaceCapacity();
    --- End diff --
    
    FYI: Java asserts are disabled by default (https://docs.oracle.com/javase/8/docs/technotes/guides/language/assert.html#enable-disable) These checks will not catch anything unless the JVM is running with `-ea` flag. 


---