You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/26 05:53:37 UTC

[doris] branch master updated: [fix](multi-catalog)fix max compute scanner OOM and datetime (#20957)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b3c82f57a [fix](multi-catalog)fix max compute scanner OOM and datetime (#20957)
2b3c82f57a is described below

commit 2b3c82f57a35398f2a11ff130867b8664c4e6d56
Author: slothever <18...@users.noreply.github.com>
AuthorDate: Mon Jun 26 13:53:29 2023 +0800

    [fix](multi-catalog)fix max compute scanner OOM and datetime (#20957)
    
    1. Fix MC jni scanner OOM
    2. add the second datetime type for MC SDK timestamp
    3. make s3 uri case insensitive by the way
    4. optimize max compute scanner parallel model
---
 .../doris/maxcompute/MaxComputeColumnValue.java    |  13 ++-
 .../doris/maxcompute/MaxComputeJniScanner.java     | 113 +++++++++++----------
 .../doris/maxcompute/MaxComputeTableScan.java      |  79 ++++++++++++++
 .../doris/tablefunction/S3TableValuedFunction.java |   6 +-
 4 files changed, 154 insertions(+), 57 deletions(-)

diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
index a2237b4cc4..9fa9eea9f6 100644
--- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
+++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
@@ -28,6 +28,7 @@ import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
@@ -162,9 +163,15 @@ public class MaxComputeColumnValue implements ColumnValue {
     @Override
     public LocalDateTime getDateTime() {
         skippedIfNull();
-        DateMilliVector datetimeCol = (DateMilliVector) column;
-        LocalDateTime v = datetimeCol.getObject(idx++);
-        return v == null ? LocalDateTime.MIN : v;
+        LocalDateTime result;
+        if (column instanceof DateMilliVector) {
+            DateMilliVector datetimeCol = (DateMilliVector) column;
+            result = datetimeCol.getObject(idx++);
+        } else {
+            TimeStampNanoVector datetimeCol = (TimeStampNanoVector) column;
+            result = datetimeCol.getObject(idx++);
+        }
+        return result == null ? LocalDateTime.MIN : result;
     }
 
     @Override
diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index 95664c93c3..8f9b903afd 100644
--- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -22,14 +22,13 @@ import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.ScanPredicate;
 
 import com.aliyun.odps.Column;
-import com.aliyun.odps.Odps;
 import com.aliyun.odps.OdpsType;
-import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.ArrowRecordReader;
 import com.aliyun.odps.tunnel.TableTunnel;
 import com.aliyun.odps.type.TypeInfo;
 import com.aliyun.odps.type.TypeInfoFactory;
 import com.google.common.base.Strings;
+import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.log4j.Logger;
@@ -41,17 +40,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * MaxComputeJ JniScanner. BE will read data from the scanner object.
  */
 public class MaxComputeJniScanner extends JniScanner {
-    private Odps odps;
-    private TableTunnel tunnel;
-
     private static final Logger LOG = Logger.getLogger(MaxComputeJniScanner.class);
-    private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun.com/api";
-    private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun-inc.com";
     private static final String REGION = "region";
     private static final String PROJECT = "project";
     private static final String TABLE = "table";
@@ -60,39 +55,28 @@ public class MaxComputeJniScanner extends JniScanner {
     private static final String START_OFFSET = "start_offset";
     private static final String SPLIT_SIZE = "split_size";
     private static final String PUBLIC_ACCESS = "public_access";
+    private static final Map<String, MaxComputeTableScan> tableScans = new ConcurrentHashMap<>();
+    private final String region;
     private final String project;
     private final String table;
+    private final MaxComputeTableScan curTableScan;
     private MaxComputeColumnValue columnValue;
     private long remainBatchRows = 0;
     private long totalRows = 0;
-    private TableTunnel.DownloadSession session;
+    private RootAllocator arrowAllocator;
     private ArrowRecordReader curReader;
-    private List<Column> columns;
-    private Map<String, Integer> readColumnsId;
+    private List<Column> readColumns;
+    private Map<String, Integer> readColumnsToId;
     private long startOffset = -1L;
     private long splitSize = -1L;
 
     public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
-        String region = Objects.requireNonNull(params.get(REGION), "required property '" + REGION + "'.");
+        region = Objects.requireNonNull(params.get(REGION), "required property '" + REGION + "'.");
         project = Objects.requireNonNull(params.get(PROJECT), "required property '" + PROJECT + "'.");
         table = Objects.requireNonNull(params.get(TABLE), "required property '" + TABLE + "'.");
-        if (!Strings.isNullOrEmpty(params.get(START_OFFSET))
-                && !Strings.isNullOrEmpty(params.get(SPLIT_SIZE))) {
-            startOffset = Long.parseLong(params.get(START_OFFSET));
-            splitSize = Long.parseLong(params.get(SPLIT_SIZE));
-        }
-        String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY), "required property '" + ACCESS_KEY + "'.");
-        String secretKey = Objects.requireNonNull(params.get(SECRET_KEY), "required property '" + SECRET_KEY + "'.");
-        odps = new Odps(new AliyunAccount(accessKey, secretKey));
-        odps.setEndpoint(odpsUrlTemplate.replace("{}", region));
-        odps.setDefaultProject(project);
-        tunnel = new TableTunnel(odps);
-        String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
-        boolean enablePublicAccess = Boolean.parseBoolean(params.getOrDefault(PUBLIC_ACCESS, "false"));
-        if (enablePublicAccess) {
-            tunnelUrl = tunnelUrlTemplate.replace("-inc", "");
-        }
-        tunnel.setEndpoint(tunnelUrl);
+        tableScans.putIfAbsent(tableUniqKey(), newTableScan(params));
+        curTableScan = tableScans.get(tableUniqKey());
+
         String[] requiredFields = params.get("required_fields").split(",");
         String[] types = params.get("columns_types").split("#");
         ColumnType[] columnTypes = new ColumnType[types.length];
@@ -110,44 +94,60 @@ public class MaxComputeJniScanner extends JniScanner {
         initTableInfo(columnTypes, requiredFields, predicates, batchSize);
     }
 
+    private MaxComputeTableScan newTableScan(Map<String, String> params) {
+        if (!Strings.isNullOrEmpty(params.get(START_OFFSET))
+                && !Strings.isNullOrEmpty(params.get(SPLIT_SIZE))) {
+            startOffset = Long.parseLong(params.get(START_OFFSET));
+            splitSize = Long.parseLong(params.get(SPLIT_SIZE));
+        }
+        String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY), "required property '" + ACCESS_KEY + "'.");
+        String secretKey = Objects.requireNonNull(params.get(SECRET_KEY), "required property '" + SECRET_KEY + "'.");
+        boolean enablePublicAccess = Boolean.parseBoolean(params.getOrDefault(PUBLIC_ACCESS, "false"));
+        return new MaxComputeTableScan(region, project, table, accessKey, secretKey, enablePublicAccess);
+    }
+
+    public String tableUniqKey() {
+        return region + "#" + project + "." + table;
+    }
+
     @Override
     protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates,
                                  int batchSize) {
         super.initTableInfo(requiredTypes, requiredFields, predicates, batchSize);
-        columns = new ArrayList<>();
-        readColumnsId = new HashMap<>();
+        readColumns = new ArrayList<>();
+        readColumnsToId = new HashMap<>();
         for (int i = 0; i < fields.length; i++) {
             if (!Strings.isNullOrEmpty(fields[i])) {
-                columns.add(createOdpsColumn(i, types[i]));
-                readColumnsId.put(fields[i], i);
+                readColumns.add(createOdpsColumn(i, types[i]));
+                readColumnsToId.put(fields[i], i);
             }
         }
         // reorder columns
-        List<Column> columnList = odps.tables().get(table).getSchema().getColumns();
+        List<Column> columnList = curTableScan.getSchema().getColumns();
         Map<String, Integer> columnRank = new HashMap<>();
         for (int i = 0; i < columnList.size(); i++) {
             columnRank.put(columnList.get(i).getName(), i);
         }
         // Downloading columns data from Max compute only supports the order of table metadata.
         // We might get an error message if no sort here: Column reorder is not supported in legacy arrow mode.
-        columns.sort((Comparator.comparing(o -> columnRank.get(o.getName()))));
+        readColumns.sort((Comparator.comparing(o -> columnRank.get(o.getName()))));
     }
 
     @Override
     public void open() throws IOException {
-        if (columns.isEmpty()) {
+        if (readColumns.isEmpty()) {
             return;
         }
         try {
-            session = tunnel.createDownloadSession(project, table);
-            if (splitSize > 0) {
-                totalRows = Math.min(splitSize, session.getRecordCount());
-            } else {
-                totalRows = session.getRecordCount();
-            }
+            TableTunnel.DownloadSession session = curTableScan.getSession();
             long start = startOffset == -1L ? 0 : startOffset;
-            curReader = session.openArrowRecordReader(start, totalRows, columns);
+            long recordCount = session.getRecordCount();
+            totalRows = splitSize > 0 ? Math.min(splitSize, recordCount) : recordCount;
+
+            arrowAllocator = new RootAllocator(Long.MAX_VALUE);
+            curReader = session.openArrowRecordReader(start, totalRows, readColumns, arrowAllocator);
         } catch (Exception e) {
+            close();
             throw new IOException(e);
         }
         remainBatchRows = totalRows;
@@ -206,12 +206,19 @@ public class MaxComputeJniScanner extends JniScanner {
 
     @Override
     public void close() throws IOException {
+        String tableName = tableUniqKey();
+        MaxComputeTableScan scan = tableScans.get(tableName);
+        if (scan != null && scan.endOfScan()) {
+            tableScans.remove(tableName);
+        }
         remainBatchRows = 0;
         totalRows = 0;
         startOffset = -1;
         splitSize = -1;
         if (curReader != null) {
+            arrowAllocator.close();
             curReader.close();
+            curReader = null;
         }
     }
 
@@ -227,6 +234,7 @@ public class MaxComputeJniScanner extends JniScanner {
             return 0;
         }
         remainBatchRows -= realRows;
+        curTableScan.increaseReadRows(realRows);
         return realRows;
     }
 
@@ -234,17 +242,20 @@ public class MaxComputeJniScanner extends JniScanner {
         VectorSchemaRoot batch;
         int curReadRows = 0;
         while (curReadRows < expectedRows && (batch = curReader.read()) != null) {
-            List<FieldVector> fieldVectors = batch.getFieldVectors();
-            int batchRows = 0;
-            for (FieldVector column : fieldVectors) {
-                columnValue.reset(column);
-                // LOG.warn("MCJNI read getClass: " + column.getClass());
-                batchRows = column.getValueCount();
-                for (int j = 0; j < batchRows; j++) {
-                    appendData(readColumnsId.get(column.getName()), columnValue);
+            try {
+                List<FieldVector> fieldVectors = batch.getFieldVectors();
+                int batchRows = 0;
+                for (FieldVector column : fieldVectors) {
+                    columnValue.reset(column);
+                    batchRows = column.getValueCount();
+                    for (int j = 0; j < batchRows; j++) {
+                        appendData(readColumnsToId.get(column.getName()), columnValue);
+                    }
                 }
+                curReadRows += batchRows;
+            } finally {
+                batch.close();
             }
-            curReadRows += batchRows;
         }
         return curReadRows;
     }
diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
new file mode 100644
index 0000000000..5102330a4d
--- /dev/null
+++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.maxcompute;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
+
+import java.io.IOException;
+
+/**
+ * MaxComputeJ JniScanner. BE will read data from the scanner object.
+ */
+public class MaxComputeTableScan {
+    private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun.com/api";
+    private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun-inc.com";
+    private final Odps odps;
+    private final TableTunnel tunnel;
+    private final String project;
+    private final String table;
+    private volatile TableTunnel.DownloadSession tableSession;
+    private volatile long readRows = 0;
+
+    public MaxComputeTableScan(String region, String project, String table,
+                               String accessKey, String secretKey, boolean enablePublicAccess) {
+        this.project = project;
+        this.table = table;
+        odps = new Odps(new AliyunAccount(accessKey, secretKey));
+        odps.setEndpoint(odpsUrlTemplate.replace("{}", region));
+        odps.setDefaultProject(this.project);
+        tunnel = new TableTunnel(odps);
+        String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
+        if (enablePublicAccess) {
+            tunnelUrl = tunnelUrlTemplate.replace("-inc", "");
+        }
+        tunnel.setEndpoint(tunnelUrl);
+    }
+
+    public TableSchema getSchema() {
+        return odps.tables().get(table).getSchema();
+    }
+
+    public synchronized TableTunnel.DownloadSession getSession() throws IOException {
+        if (tableSession == null) {
+            try {
+                tableSession = tunnel.createDownloadSession(project, table);
+            } catch (TunnelException e) {
+                throw new IOException(e);
+            }
+        }
+        return tableSession;
+    }
+
+    public synchronized void increaseReadRows(long rows) {
+        // multi-thread writing must be synchronized
+        readRows += rows;
+    }
+
+    public boolean endOfScan() {
+        return readRows >= tableSession.getRecordCount();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
index cff2b99223..9b820fa185 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
@@ -98,9 +98,6 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
     }
 
     private static Map<String, String> getValidParams(Map<String, String> params) throws AnalysisException {
-        if (!params.containsKey(S3_URI)) {
-            throw new AnalysisException("Missing required property: " + S3_URI);
-        }
         Map<String, String> validParams = new HashMap<>();
         for (Map.Entry<String, String> entry : params.entrySet()) {
             String key = entry.getKey();
@@ -113,6 +110,9 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
             }
             validParams.put(lowerKey, entry.getValue());
         }
+        if (!validParams.containsKey(S3_URI)) {
+            throw new AnalysisException("Missing required property: " + S3_URI);
+        }
         return S3Properties.requiredS3TVFProperties(validParams);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org