You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2019/04/18 20:34:33 UTC

[asterixdb] branch master updated: [NO ISSUE] Make IOManager more configurable

This is an automated email from the ASF dual-hosted git repository.

imaxon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b3eb7a4  [NO ISSUE] Make IOManager more configurable
b3eb7a4 is described below

commit b3eb7a4e8404cc0c822d3d06dc059368c5633801
Author: Ian Maxon <im...@apache.org>
AuthorDate: Wed Apr 17 16:42:55 2019 -0700

    [NO ISSUE] Make IOManager more configurable
    
    Change-Id: I1c8ad11c2b8b983ef4bf7cf78c2f068accddfff4
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3133
    Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Michael Blow <mb...@apache.org>
    Contrib: Michael Blow <mb...@apache.org>
---
 .../org/apache/asterix/app/nc/NCAppRuntimeContext.java |  6 ++++--
 .../hyracks/control/common/controllers/NCConfig.java   | 18 ++++++++++++++++--
 .../hyracks/control/nc/NodeControllerService.java      |  4 ++--
 .../org/apache/hyracks/control/nc/io/IOManager.java    | 10 +++++-----
 .../hyracks/examples/btree/helper/RuntimeContext.java  |  2 +-
 .../hyracks/hyracks-storage-common/pom.xml             |  5 -----
 .../storage/common/buffercache/BufferCache.java        | 13 ++++++-------
 .../support/TestStorageManagerComponentHolder.java     |  4 ++--
 .../org/apache/hyracks/test/support/TestUtils.java     |  2 +-
 .../am/lsm/common/test/LSMIndexFileManagerTest.java    |  2 +-
 .../hyracks/storage/common/IOManagerPathTest.java      |  8 ++++----
 11 files changed, 42 insertions(+), 32 deletions(-)

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index e663b49..8b8f5a0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -89,6 +89,7 @@ import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.network.INetworkSecurityManager;
+import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -185,6 +186,7 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     public void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory,
             boolean initialRun) throws IOException {
         ioManager = getServiceContext().getIoManager();
+        int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
         threadExecutor =
                 MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
@@ -239,11 +241,11 @@ public class NCAppRuntimeContext implements INcApplicationContext {
             replicationChannel = new ReplicationChannel(this);
 
             bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(),
-                    storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory(),
+                    storageProperties.getBufferCacheMaxOpenFiles(), ioQueueLen, getServiceContext().getThreadFactory(),
                     replicationManager);
         } else {
             bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(),
-                    storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory());
+                    storageProperties.getBufferCacheMaxOpenFiles(), ioQueueLen, getServiceContext().getThreadFactory());
         }
 
         /*
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 3619cbb..dd92798 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -88,7 +88,9 @@ public class NCConfig extends ControllerConfig {
         TRACE_CATEGORIES(STRING_ARRAY, new String[0]),
         KEY_STORE_PATH(STRING, (String) null),
         TRUST_STORE_PATH(STRING, (String) null),
-        KEY_STORE_PASSWORD(STRING, (String) null);
+        KEY_STORE_PASSWORD(STRING, (String) null),
+        IO_WORKERS_PER_PARTITION(POSITIVE_INTEGER, 2),
+        IO_QUEUE_SIZE(POSITIVE_INTEGER, 10);
 
         private final IOptionType parser;
         private final String defaultValueDescription;
@@ -217,8 +219,12 @@ public class NCConfig extends ControllerConfig {
                     return "A fully-qualified path to a trust store file that will be used for secured connections";
                 case KEY_STORE_PASSWORD:
                     return "The password to the provided key store";
+                case IO_WORKERS_PER_PARTITION:
+                    return "Number of threads per partition used to write and read from storage";
+                case IO_QUEUE_SIZE:
+                    return "Length of the queue used for requests to write and read";
                 default:
-                    throw new IllegalStateException("NYI: " + this);
+                    throw new IllegalStateException("Not yet implemented: " + this);
             }
         }
 
@@ -575,4 +581,12 @@ public class NCConfig extends ControllerConfig {
     public void setTrustStorePath(String keyStorePath) {
         configManager.set(nodeId, Option.TRUST_STORE_PATH, keyStorePath);
     }
+
+    public int getIOParallelism() {
+        return appConfig.getInt(Option.IO_WORKERS_PER_PARTITION);
+    }
+
+    public int getIOQueueSize() {
+        return appConfig.getInt(Option.IO_QUEUE_SIZE);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 317d59a..517169b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -214,8 +214,8 @@ public class NodeControllerService implements IControllerService {
         ncShutdownHook = new NCShutdownHook(this);
         Runtime.getRuntime().addShutdownHook(ncShutdownHook);
         Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
-        ioManager =
-                new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver());
+        ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()),
+                application.getFileDeviceResolver(), ncConfig.getIOParallelism(), ncConfig.getIOQueueSize());
         try {
             workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
             jobletMap = new ConcurrentHashMap<>();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index b5cb21a..14404d2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -56,7 +56,6 @@ public class IOManager implements IIOManager {
     /*
      * Constants
      */
-    public static final int IO_REQUEST_QUEUE_SIZE = 100; // TODO: Make configurable
     private static final Logger LOGGER = LogManager.getLogger();
     private static final String WORKSPACE_FILE_SUFFIX = ".waf";
     private static final FilenameFilter WORKSPACE_FILES_FILTER = (dir, name) -> name.endsWith(WORKSPACE_FILE_SUFFIX);
@@ -74,7 +73,8 @@ public class IOManager implements IIOManager {
     private int workspaceIndex;
     private final IFileDeviceResolver deviceComputer;
 
-    public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer) throws HyracksDataException {
+    public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer, int ioParallelism, int queueSize)
+            throws HyracksDataException {
         this.ioDevices = Collections.unmodifiableList(devices);
         checkDeviceValidity(devices);
         workspaces = new ArrayList<>();
@@ -93,9 +93,9 @@ public class IOManager implements IIOManager {
         }
         workspaceIndex = 0;
         this.deviceComputer = deviceComputer;
-        submittedRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
-        freeRequests = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
-        int numIoThreads = ioDevices.size() * 2;
+        submittedRequests = new ArrayBlockingQueue<>(queueSize);
+        freeRequests = new ArrayBlockingQueue<>(queueSize);
+        int numIoThreads = ioDevices.size() * ioParallelism;
         executor = Executors.newFixedThreadPool(numIoThreads);
         for (int i = 0; i < numIoThreads; i++) {
             executor.execute(new IoRequestHandler(i, submittedRequests));
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index d107ff2..7ba81e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -60,7 +60,7 @@ public class RuntimeContext {
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
         IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, 32768, 50);
         bufferCache = new BufferCache(appCtx.getIoManager(), prs, new DelayPageCleanerPolicy(1000), fileMapManager, 100,
-                threadFactory);
+                10, threadFactory);
         ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
         localResourceRepository = localResourceRepositoryFactory.createRepository();
         resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
index 423925b..31037ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
@@ -50,11 +50,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-control-nc</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-util</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index b40f252..c4c5d8a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.storage.common.buffercache;
 
-import static org.apache.hyracks.control.nc.io.IOManager.IO_REQUEST_QUEUE_SIZE;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -76,8 +74,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
     private final CleanerThread cleanerThread;
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
     private final AsyncFIFOPageQueueManager fifoWriter;
-    private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache =
-            new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
+    private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache;
 
     private IIOReplicationManager ioReplicationManager;
     private final List<ICachedPageInternal> cachedPages = new ArrayList<>();
@@ -93,8 +90,9 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
     private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner;
 
     public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy,
-            IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles,
+            IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, int ioQueuelen,
             ThreadFactory threadFactory) {
+        this.headerPageCache = new ArrayBlockingQueue<>(ioQueuelen);
         this.ioManager = ioManager;
         this.pageSize = pageReplacementStrategy.getPageSize();
         this.maxOpenFiles = maxOpenFiles;
@@ -124,10 +122,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
 
     //this constructor is used when replication is enabled to pass the IIOReplicationManager
     public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy,
-            IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles,
+            IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, int ioQueueLen,
             ThreadFactory threadFactory, IIOReplicationManager ioReplicationManager) {
 
-        this(ioManager, pageReplacementStrategy, pageCleanerPolicy, fileMapManager, maxOpenFiles, threadFactory);
+        this(ioManager, pageReplacementStrategy, pageCleanerPolicy, fileMapManager, maxOpenFiles, ioQueueLen,
+                threadFactory);
         this.ioReplicationManager = ioReplicationManager;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 7b7e850..a2da285 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -107,7 +107,7 @@ public class TestStorageManagerComponentHolder {
             List<IODeviceHandle> devices = new ArrayList<>();
             devices.add(new IODeviceHandle(new File(System.getProperty("user.dir") + File.separator + "target"),
                     "iodev_test_wa"));
-            ioManager = new IOManager(devices, new DefaultDeviceResolver());
+            ioManager = new IOManager(devices, new DefaultDeviceResolver(), 2, 10);
         }
         return ioManager;
     }
@@ -152,7 +152,7 @@ public class TestStorageManagerComponentHolder {
         IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, pageSize, numPages);
         IFileMapProvider fileMapProvider = getFileMapProvider();
         bufferCache = new BufferCache(ioManager, prs, new DelayPageCleanerPolicy(1000),
-                (IFileMapManager) fileMapProvider, maxOpenFiles, threadFactory);
+                (IFileMapManager) fileMapProvider, maxOpenFiles, 10, threadFactory);
         return bufferCache;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index ebfaeb8..e36b655 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -76,7 +76,7 @@ public class TestUtils {
     private static IOManager createIoManager() throws HyracksException {
         List<IODeviceHandle> devices = new ArrayList<>();
         devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), "."));
-        return new IOManager(devices, new DefaultDeviceResolver());
+        return new IOManager(devices, new DefaultDeviceResolver(), 2, 10);
     }
 
     public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
index 22456e8..5d10603 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/LSMIndexFileManagerTest.java
@@ -256,7 +256,7 @@ public class LSMIndexFileManagerTest {
             String iodevPath = System.getProperty("java.io.tmpdir") + sep + "test_iodev" + i;
             devices.add(new IODeviceHandle(new File(iodevPath), "wa"));
         }
-        return new IOManager(devices, new DefaultDeviceResolver());
+        return new IOManager(devices, new DefaultDeviceResolver(), 2, 10);
     }
 
     private FileReference simulateMerge(ILSMIndexFileManager fileManager, FileReference a, FileReference b)
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java
index e2a875b..b0ecc1a 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerPathTest.java
@@ -38,8 +38,8 @@ public class IOManagerPathTest {
     public void testPrefixNames() throws HyracksDataException {
         IODeviceHandle shorter = new IODeviceHandle(new File("/tmp/tst/1"), "storage");
         IODeviceHandle longer = new IODeviceHandle(new File("/tmp/tst/11"), "storage");
-        IOManager ioManager =
-                new IOManager(Arrays.asList(new IODeviceHandle[] { shorter, longer }), new DefaultDeviceResolver());
+        IOManager ioManager = new IOManager(Arrays.asList(new IODeviceHandle[] { shorter, longer }),
+                new DefaultDeviceResolver(), 2, 10);
         FileReference f = ioManager.resolveAbsolutePath("/tmp/tst/11/storage/Foo_idx_foo/my_btree");
         Assert.assertEquals("/tmp/tst/11/storage/Foo_idx_foo/my_btree", f.getAbsolutePath());
     }
@@ -48,8 +48,8 @@ public class IOManagerPathTest {
     public void testDuplicates() throws HyracksDataException {
         IODeviceHandle first = new IODeviceHandle(new File("/tmp/tst/1"), "storage");
         IODeviceHandle second = new IODeviceHandle(new File("/tmp/tst/1"), "storage");
-        IOManager ioManager =
-                new IOManager(Arrays.asList(new IODeviceHandle[] { first, second }), new DefaultDeviceResolver());
+        IOManager ioManager = new IOManager(Arrays.asList(new IODeviceHandle[] { first, second }),
+                new DefaultDeviceResolver(), 2, 19);
     }
 
     @After