You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2021/05/26 23:43:32 UTC

[asterixdb] 26/38: [ASTERIXDB-2896] Increase UDF argument buffer size

This is an automated email from the ASF dual-hosted git repository.

imaxon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit a0d1fb7acb588f4eab940501deee738789e9af50
Author: Ian Maxon <ia...@maxons.email>
AuthorDate: Sun May 2 11:23:41 2021 -0700

    [ASTERIXDB-2896] Increase UDF argument buffer size
    
    -user model changes: no
    -storage format changes: no
    -interface changes: yes
    
    Details:
    
    Bump buffer sizes in Python IPC to 1MB for individual
    arguments, and in the case of batching, match the
    buffer size of the Hyracks IPC layer for deserialization.
    
    Change-Id: If847ac3b09406d1e9e6a976a7e0193b6e81bcc8b
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11243
    Reviewed-by: Ian Maxon <im...@uci.edu>
    Reviewed-by: Michael Blow <mb...@apache.org>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Contrib: Ian Maxon <im...@uci.edu>
---
 .../ExternalScalarPythonFunctionEvaluator.java        |  9 ++++++---
 .../operators/ExternalAssignBatchRuntimeFactory.java  | 19 ++++++++++++++++---
 .../asterix/external/util/ExternalDataUtils.java      | 19 +++++++++++++++++++
 3 files changed, 41 insertions(+), 6 deletions(-)

diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
index eb87399..7c860a2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.functions.IExternalFunctionInfo;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
@@ -77,9 +78,11 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
         for (int i = 0; i < argValues.length; i++) {
             argValues[i] = VoidPointable.FACTORY.createPointable();
         }
-        //TODO: these should be dynamic
-        this.argHolder = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]);
-        this.outputWrapper = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]);
+        //TODO: these should be dynamic. this static size picking is a temporary bodge until this works like
+        //      v-size frames do or these construction buffers are removed entirely
+        int maxArgSz = ExternalDataUtils.getArgBufferSize();
+        this.argHolder = ByteBuffer.wrap(new byte[maxArgSz]);
+        this.outputWrapper = ByteBuffer.wrap(new byte[maxArgSz]);
         this.evaluatorContext = ctx;
         this.sourceLocation = sourceLoc;
         this.unpackerInput = new ArrayBufferInput(new byte[0]);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
index 39e480a..593bac6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
@@ -36,6 +36,7 @@ import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.library.PythonLibraryEvaluator;
 import org.apache.asterix.external.library.PythonLibraryEvaluatorFactory;
 import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -61,6 +62,8 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
     private final IExternalFunctionDescriptor[] fnDescs;
     private final int[][] fnArgColumns;
 
+    private int rpcBufferSize;
+
     public ExternalAssignBatchRuntimeFactory(int[] outColumns, IExternalFunctionDescriptor[] fnDescs,
             int[][] fnArgColumns, int[] projectionList) {
         super(projectionList);
@@ -73,6 +76,9 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
     public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
 
         final int[] projectionToOutColumns = new int[projectionList.length];
+        //this is a temporary bodge. these buffers need to work like vsize frames, or be absent entirely
+        int maxArgSz = ExternalDataUtils.getArgBufferSize();
+        rpcBufferSize = ExternalDataUtils.roundUpToNearestFrameSize(maxArgSz, ctx.getInitialFrameSize());
         for (int j = 0; j < projectionList.length; j++) {
             projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
         }
@@ -110,14 +116,14 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
                 }
                 argHolders = new ArrayList<>(fnArgColumns.length);
                 for (int i = 0; i < fnArgColumns.length; i++) {
-                    argHolders.add(ctx.allocateFrame());
+                    argHolders.add(ctx.allocateFrame(rpcBufferSize));
                 }
                 outputWrapper = ctx.allocateFrame();
                 nullCalls = new ATypeTag[argHolders.size()][0];
                 numCalls = new int[fnArgColumns.length];
                 batchResults = new ArrayList<>(argHolders.size());
                 for (int i = 0; i < argHolders.size(); i++) {
-                    batchResults.add(new Pair<>(ctx.allocateFrame(), new Counter(-1)));
+                    batchResults.add(new Pair<>(ctx.allocateFrame(rpcBufferSize), new Counter(-1)));
                 }
                 unpackerInput = new ArrayBufferInput(new byte[0]);
                 unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
@@ -230,7 +236,8 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
                         if (columnResult != null) {
                             Pair<ByteBuffer, Counter> resultholder = batchResults.get(argHolderIdx);
                             if (resultholder.getFirst().capacity() < columnResult.capacity()) {
-                                resultholder.setFirst(ctx.allocateFrame(columnResult.capacity()));
+                                resultholder.setFirst(ctx.allocateFrame(ExternalDataUtils.roundUpToNearestFrameSize(
+                                        columnResult.capacity(), ctx.getInitialFrameSize())));
                             }
                             ByteBuffer resultBuf = resultholder.getFirst();
                             resultBuf.clear();
@@ -262,6 +269,12 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
                                 outputWrapper.clear();
                                 outputWrapper.position(0);
                                 Pair<ByteBuffer, Counter> result = batchResults.get(k);
+                                if (result.getFirst() != null) {
+                                    if (result.getFirst().capacity() > outputWrapper.capacity()) {
+                                        outputWrapper = ctx.allocateFrame(ExternalDataUtils.roundUpToNearestFrameSize(
+                                                outputWrapper.capacity(), ctx.getInitialFrameSize()));
+                                    }
+                                }
                                 int start = outputWrapper.arrayOffset();
                                 ATypeTag functionCalled = nullCalls[k][i];
                                 if (functionCalled == ATypeTag.TYPE) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index f5c62a5..8e94263 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -80,6 +80,7 @@ import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import org.apache.hyracks.util.StorageUtil;
 
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.BlobServiceClient;
@@ -106,6 +107,8 @@ import software.amazon.awssdk.services.s3.model.S3Response;
 public class ExternalDataUtils {
 
     private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
+    private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
+    private static final int HEADER_FUDGE = 64;
 
     static {
         valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
@@ -997,4 +1000,20 @@ public class ExternalDataUtils {
             }
         }
     }
+
+    public static int roundUpToNearestFrameSize(int size, int framesize) {
+        return ((size / framesize) + 1) * framesize;
+    }
+
+    public static int getArgBufferSize() {
+        int maxArgSz = DEFAULT_MAX_ARGUMENT_SZ + HEADER_FUDGE;
+        String userArgSz = System.getProperty("udf.buf.size");
+        if (userArgSz != null) {
+            long parsedSize = StorageUtil.getByteValue(userArgSz) + HEADER_FUDGE;
+            if (parsedSize < Integer.MAX_VALUE && parsedSize > 0) {
+                maxArgSz = (int) parsedSize;
+            }
+        }
+        return maxArgSz;
+    }
 }