You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2023/11/18 17:29:44 UTC

(asterixdb) 04/05: Merge branch 'gerrit/trinity' into 'master'

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 9607ea5af9cfd42e5c78ec687b79ac149db91904
Merge: 9b6164e7d2 6c1e847f19
Author: Michael Blow <mi...@couchbase.com>
AuthorDate: Sat Nov 18 09:02:05 2023 -0500

    Merge branch 'gerrit/trinity' into 'master'
    
    Change-Id: I7e0d18ecfdd8d1ea72078f13997c1754fdb8b1cd

 .../apache/asterix/api/common/APIFramework.java    |  12 +-
 .../asterix/app/result/JobResultCallback.java      |   2 +-
 .../asterix/app/translator/QueryTranslator.java    |  10 +-
 .../test/runtime/ProfiledExecutionTest.java        |   3 -
 .../src/test/resources/runtimets/profiled.xml      |   6 +
 .../non-unary-subplan.1.ddl.sqlpp}                 |  14 +-
 .../non-unary-subplan.2.update.sqlpp}              |  15 +-
 .../non-unary-subplan.3.profile.sqlpp}             |  18 +-
 .../profile/sleep/sleep.5.profile.sqlpp            |   2 +-
 .../profile/full-scan/full-scan.3.regexjson        |  18 ++
 .../non-unary-subplan.3.regexjson                  | 241 +++++++++++++++++++++
 .../results/profile/sleep/sleep.3.regexjson        |  28 ++-
 .../results/profile/sleep/sleep.4.regexjson        |  54 ++++-
 .../results/profile/sleep/sleep.5.regexjson        |  60 ++++-
 .../common/ClosedRecordConstructorEvalFactory.java |   5 +
 .../DatasetStreamStatsOperatorDescriptor.java      |   4 +-
 .../api/HeuristicCompilerFactoryBuilder.java       |  13 ++
 .../hyracks/algebricks/compiler/api/ICompiler.java |   6 +
 .../LogicalOperatorPrettyPrintVisitorJson.java     | 118 ++++++++--
 .../algebricks/core/jobgen/impl/JobBuilder.java    |  46 +++-
 .../algebricks/core/jobgen/impl/PlanCompiler.java  |   9 +
 .../runtime/base/IPushRuntimeFactory.java          |   1 +
 .../runtime/base/ProfiledPushRuntime.java          |  95 ++++++++
 .../meta/AlgebricksMetaOperatorDescriptor.java     | 113 +++++++++-
 .../runtime/operators/meta/PipelineAssembler.java  |  24 +-
 .../operators/meta/SubplanRuntimeFactory.java      |  29 ++-
 .../api/dataflow/ISelfProfilingNodePushable.java   |  15 +-
 .../api/dataflow/IStatsContainingNodePushable.java |  12 +-
 .../apache/hyracks/api/dataflow/ITimedWriter.java  |  15 +-
 .../hyracks/api/dataflow/ProfiledFrameWriter.java  |  59 ++---
 .../api/dataflow/ProfiledOperatorNodePushable.java |  49 +++--
 .../hyracks/api/job/profiling/IOperatorStats.java  |  16 +-
 .../api/job/profiling/NoOpOperatorStats.java       |  16 +-
 .../hyracks/api/job/profiling/OperatorStats.java   |  45 ++--
 .../runtime/SuperActivityOperatorNodePushable.java |  17 +-
 .../common/job/profiling/StatsCollector.java       |  12 +-
 .../common/job/profiling/om/JobProfile.java        |   4 +-
 .../common/job/profiling/om/TaskProfile.java       |   6 +-
 ...aryOutputIntrospectingOperatorNodePushable.java |  14 +-
 39 files changed, 1002 insertions(+), 224 deletions(-)

diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 9517cf06ca,0cc27903e4..9bd65a694e
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@@ -203,9 -204,10 +205,10 @@@ public class APIFramework 
      }
  
      public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
 -            Query query, int varCounter, String outputDatasetName, SessionOutput output,
 -            ICompiledDmlStatement statement, Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer,
 -            IWarningCollector warningCollector, IRequestParameters requestParameters, EnumSet<JobFlag> runtimeFlags)
 +            Query query, int varCounter, String outputDatasetName, SessionOutput output, ICompiledStatement statement,
 +            Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer, IWarningCollector warningCollector,
-             IRequestParameters requestParameters) throws AlgebricksException, ACIDException {
++            IRequestParameters requestParameters, EnumSet<JobFlag> runtimeFlags)
+             throws AlgebricksException, ACIDException {
  
          // establish facts
          final boolean isQuery = query != null;
@@@ -345,10 -346,13 +348,13 @@@
          }
  
          if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN) || isExplainOnly) {
 -            if (isQuery || isLoad) {
 +            if (isQuery || isLoad || isCopy) {
                  generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(),
                          cboMode);
-                 lastPlan = new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
+                 if (runtimeFlags.contains(JobFlag.PROFILE_RUNTIME)) {
+                     lastPlan =
+                             new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
+                 }
              }
          }
  
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d52429f1d6,e602b761d8..a99fc22472
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@@ -3887,181 -3680,6 +3887,181 @@@ public class QueryTranslator extends Ab
          }
      }
  
 +    protected Map<String, String> createExternalDataPropertiesForCopyFromStmt(String databaseName,
 +            DataverseName dataverseName, CopyFromStatement copyFromStatement, Datatype itemType,
 +            MetadataTransactionContext mdTxnCtx, MetadataProvider md) throws AlgebricksException {
 +        ExternalDetailsDecl edd = copyFromStatement.getExternalDetails();
 +        Map<String, String> properties = copyFromStatement.getExternalDetails().getProperties();
 +        String path = copyFromStatement.getPath();
 +        String pathKey = ExternalDataUtils.getPathKey(edd.getAdapter());
 +        properties.put(pathKey, path);
 +        return properties;
 +    }
 +
 +    protected void handleCopyFromStatement(MetadataProvider metadataProvider, Statement stmt,
 +            IHyracksClientConnection hcc) throws Exception {
 +        CopyFromStatement copyStmt = (CopyFromStatement) stmt;
 +        String datasetName = copyStmt.getDatasetName();
 +        metadataProvider.validateDatabaseObjectName(copyStmt.getNamespace(), datasetName, copyStmt.getSourceLocation());
 +        Namespace stmtActiveNamespace = getActiveNamespace(copyStmt.getNamespace());
 +        DataverseName dataverseName = stmtActiveNamespace.getDataverseName();
 +        String databaseName = stmtActiveNamespace.getDatabaseName();
 +        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
 +        boolean bActiveTxn = true;
 +        metadataProvider.setMetadataTxnContext(mdTxnCtx);
 +        lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), databaseName, dataverseName,
 +                datasetName);
 +        JobId jobId = null;
 +        boolean atomic = false;
 +        try {
 +            metadataProvider.setWriteTransaction(true);
 +            Dataset dataset = metadataProvider.findDataset(databaseName, dataverseName, copyStmt.getDatasetName());
 +            if (dataset == null) {
 +                throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, stmt.getSourceLocation(),
 +                        datasetName,
 +                        MetadataUtil.dataverseName(databaseName, dataverseName, metadataProvider.isUsingDatabase()));
 +            }
 +            Datatype itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDatabaseName(),
 +                    dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
 +            // Copy statement with csv files will have a type expression
 +            if (copyStmt.getTypeExpr() != null) {
 +                TypeExpression itemTypeExpr = copyStmt.getTypeExpr();
 +                Triple<Namespace, String, Boolean> itemTypeQualifiedName = extractDatasetItemTypeName(
 +                        stmtActiveNamespace, datasetName, itemTypeExpr, false, stmt.getSourceLocation());
 +                Namespace itemTypeNamespace = itemTypeQualifiedName.first;
 +                DataverseName itemTypeDataverseName = itemTypeNamespace.getDataverseName();
 +                String itemTypeName = itemTypeQualifiedName.second;
 +                String itemTypeDatabaseName = itemTypeNamespace.getDatabaseName();
 +                IAType itemTypeEntity = translateType(itemTypeDatabaseName, itemTypeDataverseName, itemTypeName,
 +                        itemTypeExpr, mdTxnCtx);
 +                itemType =
 +                        new Datatype(itemTypeDatabaseName, itemTypeDataverseName, itemTypeName, itemTypeEntity, true);
 +            }
 +            ExternalDetailsDecl externalDetails = copyStmt.getExternalDetails();
 +            Map<String, String> properties = createExternalDataPropertiesForCopyFromStmt(databaseName, dataverseName,
 +                    copyStmt, itemType, mdTxnCtx, metadataProvider);
 +            ExternalDataUtils.normalize(properties);
 +            ExternalDataUtils.validate(properties);
 +            validateExternalDatasetProperties(externalDetails, properties, copyStmt.getSourceLocation(), mdTxnCtx,
 +                    appCtx);
 +            CompiledCopyFromFileStatement cls = new CompiledCopyFromFileStatement(databaseName, dataverseName,
 +                    copyStmt.getDatasetName(), itemType, externalDetails.getAdapter(), properties);
 +            cls.setSourceLocation(stmt.getSourceLocation());
 +            JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
-                     null, responsePrinter, warningCollector, null);
++                    null, responsePrinter, warningCollector, null, jobFlags);
 +            afterCompile();
 +            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
 +            bActiveTxn = false;
 +            if (spec != null && !isCompileOnly()) {
 +                atomic = dataset.isAtomic();
 +                if (atomic) {
 +                    int numParticipatingNodes = appCtx.getNodeJobTracker()
 +                            .getJobParticipatingNodes(spec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
 +                            .size();
 +                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(spec,
 +                            LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
 +                    List<Integer> participatingDatasetIds = new ArrayList<>();
 +                    participatingDatasetIds.add(dataset.getDatasetId());
 +                    spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds,
 +                            numParticipatingNodes, numParticipatingPartitions));
 +                }
 +                jobId = JobUtils.runJob(hcc, spec, jobFlags, false);
 +
 +                String nameBefore = Thread.currentThread().getName();
 +                try {
 +                    Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
 +                    hcc.waitForCompletion(jobId);
 +                } finally {
 +                    Thread.currentThread().setName(nameBefore);
 +                }
 +                if (atomic) {
 +                    globalTxManager.commitTransaction(jobId);
 +                }
 +            }
 +        } catch (Exception e) {
 +            if (atomic && jobId != null) {
 +                globalTxManager.abortTransaction(jobId);
 +            }
 +            if (bActiveTxn) {
 +                abort(e, e, mdTxnCtx);
 +            }
 +            throw e;
 +        } finally {
 +            metadataProvider.getLocks().unlock();
 +        }
 +    }
 +
 +    protected void handleCopyToStatement(MetadataProvider metadataProvider, Statement stmt,
 +            IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
 +            ResultMetadata outMetadata, IRequestParameters requestParameters, Map<String, IAObject> stmtParams,
 +            Stats stats) throws Exception {
 +        CopyToStatement copyTo = (CopyToStatement) stmt;
 +        final IRequestTracker requestTracker = appCtx.getRequestTracker();
 +        final ClientRequest clientRequest =
 +                (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
 +        final IMetadataLocker locker = new IMetadataLocker() {
 +            @Override
 +            public void lock() throws RuntimeDataException, InterruptedException {
 +                try {
 +                    compilationLock.readLock().lockInterruptibly();
 +                } catch (InterruptedException e) {
 +                    Thread.currentThread().interrupt();
 +                    ensureNotCancelled(clientRequest);
 +                    throw e;
 +                }
 +            }
 +
 +            @Override
 +            public void unlock() {
 +                metadataProvider.getLocks().unlock();
 +                compilationLock.readLock().unlock();
 +            }
 +        };
 +        final IStatementCompiler compiler = () -> {
 +            long compileStart = System.nanoTime();
 +            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
 +            boolean bActiveTxn = true;
 +            metadataProvider.setMetadataTxnContext(mdTxnCtx);
 +            try {
 +                ExternalDetailsDecl edd = copyTo.getExternalDetailsDecl();
 +                edd.setProperties(createAndValidateAdapterConfigurationForCopyToStmt(edd,
 +                        ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS, copyTo.getSourceLocation(), mdTxnCtx,
 +                        metadataProvider));
 +
 +                Map<VarIdentifier, IAObject> externalVars = createExternalVariables(copyTo, stmtParams);
 +                // Query Rewriting (happens under the same ongoing metadata transaction)
 +                LangRewritingContext langRewritingContext = createLangRewritingContext(metadataProvider,
 +                        declaredFunctions, null, warningCollector, copyTo.getVarCounter());
 +                Pair<IReturningStatement, Integer> rewrittenResult = apiFramework.reWriteQuery(langRewritingContext,
 +                        copyTo, sessionOutput, true, true, externalVars.keySet());
 +
 +                CompiledStatements.CompiledCopyToStatement compiledCopyToStatement =
 +                        new CompiledStatements.CompiledCopyToStatement(copyTo);
 +
 +                // Query Compilation (happens under the same ongoing metadata transaction)
 +                final JobSpecification jobSpec = apiFramework.compileQuery(hcc, metadataProvider, copyTo.getQuery(),
 +                        rewrittenResult.second, null, sessionOutput, compiledCopyToStatement, externalVars,
-                         responsePrinter, warningCollector, requestParameters);
++                        responsePrinter, warningCollector, requestParameters, jobFlags);
 +                // update stats with count of compile-time warnings. needs to be adapted for multi-statement.
 +                stats.updateTotalWarningsCount(warningCollector.getTotalWarningsCount());
 +                afterCompile();
 +                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
 +                stats.setCompileTime(System.nanoTime() - compileStart);
 +                bActiveTxn = false;
 +                return isCompileOnly() ? null : jobSpec;
 +            } catch (Exception e) {
 +                LOGGER.log(Level.INFO, e.getMessage(), e);
 +                if (bActiveTxn) {
 +                    abort(e, e, mdTxnCtx);
 +                }
 +                throw e;
 +            }
 +        };
 +
 +        deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
 +                requestParameters, true, null);
 +    }
 +
      public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
              IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
              ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters,
diff --cc asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
index f712bdd61e,f9d75b510c..6c7f2f0d63
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
@@@ -85,12 -85,30 +87,12 @@@ public final class DatasetStreamStatsOp
                  writer.open();
                  IStatsCollector coll = ctx.getStatsCollector();
                  if (coll != null) {
-                     coll.add(new OperatorStats(operatorName));
+                     coll.add(new OperatorStats(operatorName, INVALID_ODID));
                  }
                  INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
 -                indexStats = new HashMap<>();
 -                for (int i = 0; i < indexes.length; i++) {
 -                    IIndexDataflowHelper idxFlowHelper = indexes[i].create(serviceCtx, partition);
 -                    try {
 -                        idxFlowHelper.open();
 -                        ILSMIndex indexInstance = (ILSMIndex) idxFlowHelper.getIndexInstance();
 -                        long numPages = 0;
 -                        synchronized (indexInstance.getOperationTracker()) {
 -                            for (ILSMDiskComponent component : indexInstance.getDiskComponents()) {
 -                                long componentSize = component.getComponentSize();
 -                                if (component instanceof AbstractLSMWithBloomFilterDiskComponent) {
 -                                    componentSize -= ((AbstractLSMWithBloomFilterDiskComponent) component)
 -                                            .getBloomFilter().getFileReference().getFile().length();
 -                                }
 -                                numPages += componentSize / indexInstance.getBufferCache().getPageSize();
 -                            }
 -                        }
 -                        indexStats.put(indexesNames[i], new IndexStats(indexesNames[i], numPages));
 -                    } finally {
 -                        idxFlowHelper.close();
 -                    }
 +                indexesStats = new HashMap<>();
 +                if (indexes.length > 0) {
 +                    gatherIndexesStats(serviceCtx, partitionsMap[partition]);
                  }
              }
  
diff --cc hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 3e4e09f74d,5c532d3665..d1a356cb45
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@@ -80,7 -82,8 +81,7 @@@ import org.apache.hyracks.algebricks.co
  import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
  import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
  import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
- import org.apache.hyracks.api.dataflow.ActivityId;
 -import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+ import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
  import org.apache.hyracks.api.exceptions.ErrorCode;
  
  import com.fasterxml.jackson.core.JsonFactory;