You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2023/11/18 17:29:44 UTC
(asterixdb) 04/05: Merge branch 'gerrit/trinity' into 'master'
This is an automated email from the ASF dual-hosted git repository.
mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 9607ea5af9cfd42e5c78ec687b79ac149db91904
Merge: 9b6164e7d2 6c1e847f19
Author: Michael Blow <mi...@couchbase.com>
AuthorDate: Sat Nov 18 09:02:05 2023 -0500
Merge branch 'gerrit/trinity' into 'master'
Change-Id: I7e0d18ecfdd8d1ea72078f13997c1754fdb8b1cd
.../apache/asterix/api/common/APIFramework.java | 12 +-
.../asterix/app/result/JobResultCallback.java | 2 +-
.../asterix/app/translator/QueryTranslator.java | 10 +-
.../test/runtime/ProfiledExecutionTest.java | 3 -
.../src/test/resources/runtimets/profiled.xml | 6 +
.../non-unary-subplan.1.ddl.sqlpp} | 14 +-
.../non-unary-subplan.2.update.sqlpp} | 15 +-
.../non-unary-subplan.3.profile.sqlpp} | 18 +-
.../profile/sleep/sleep.5.profile.sqlpp | 2 +-
.../profile/full-scan/full-scan.3.regexjson | 18 ++
.../non-unary-subplan.3.regexjson | 241 +++++++++++++++++++++
.../results/profile/sleep/sleep.3.regexjson | 28 ++-
.../results/profile/sleep/sleep.4.regexjson | 54 ++++-
.../results/profile/sleep/sleep.5.regexjson | 60 ++++-
.../common/ClosedRecordConstructorEvalFactory.java | 5 +
.../DatasetStreamStatsOperatorDescriptor.java | 4 +-
.../api/HeuristicCompilerFactoryBuilder.java | 13 ++
.../hyracks/algebricks/compiler/api/ICompiler.java | 6 +
.../LogicalOperatorPrettyPrintVisitorJson.java | 118 ++++++++--
.../algebricks/core/jobgen/impl/JobBuilder.java | 46 +++-
.../algebricks/core/jobgen/impl/PlanCompiler.java | 9 +
.../runtime/base/IPushRuntimeFactory.java | 1 +
.../runtime/base/ProfiledPushRuntime.java | 95 ++++++++
.../meta/AlgebricksMetaOperatorDescriptor.java | 113 +++++++++-
.../runtime/operators/meta/PipelineAssembler.java | 24 +-
.../operators/meta/SubplanRuntimeFactory.java | 29 ++-
.../api/dataflow/ISelfProfilingNodePushable.java | 15 +-
.../api/dataflow/IStatsContainingNodePushable.java | 12 +-
.../apache/hyracks/api/dataflow/ITimedWriter.java | 15 +-
.../hyracks/api/dataflow/ProfiledFrameWriter.java | 59 ++---
.../api/dataflow/ProfiledOperatorNodePushable.java | 49 +++--
.../hyracks/api/job/profiling/IOperatorStats.java | 16 +-
.../api/job/profiling/NoOpOperatorStats.java | 16 +-
.../hyracks/api/job/profiling/OperatorStats.java | 45 ++--
.../runtime/SuperActivityOperatorNodePushable.java | 17 +-
.../common/job/profiling/StatsCollector.java | 12 +-
.../common/job/profiling/om/JobProfile.java | 4 +-
.../common/job/profiling/om/TaskProfile.java | 6 +-
...aryOutputIntrospectingOperatorNodePushable.java | 14 +-
39 files changed, 1002 insertions(+), 224 deletions(-)
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 9517cf06ca,0cc27903e4..9bd65a694e
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@@ -203,9 -204,10 +205,10 @@@ public class APIFramework
}
public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
- Query query, int varCounter, String outputDatasetName, SessionOutput output,
- ICompiledDmlStatement statement, Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer,
- IWarningCollector warningCollector, IRequestParameters requestParameters, EnumSet<JobFlag> runtimeFlags)
+ Query query, int varCounter, String outputDatasetName, SessionOutput output, ICompiledStatement statement,
+ Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer, IWarningCollector warningCollector,
- IRequestParameters requestParameters) throws AlgebricksException, ACIDException {
++ IRequestParameters requestParameters, EnumSet<JobFlag> runtimeFlags)
+ throws AlgebricksException, ACIDException {
// establish facts
final boolean isQuery = query != null;
@@@ -345,10 -346,13 +348,13 @@@
}
if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN) || isExplainOnly) {
- if (isQuery || isLoad) {
+ if (isQuery || isLoad || isCopy) {
generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(),
cboMode);
- lastPlan = new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
+ if (runtimeFlags.contains(JobFlag.PROFILE_RUNTIME)) {
+ lastPlan =
+ new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
+ }
}
}
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d52429f1d6,e602b761d8..a99fc22472
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@@ -3887,181 -3680,6 +3887,181 @@@ public class QueryTranslator extends Ab
}
}
+ protected Map<String, String> createExternalDataPropertiesForCopyFromStmt(String databaseName,
+ DataverseName dataverseName, CopyFromStatement copyFromStatement, Datatype itemType,
+ MetadataTransactionContext mdTxnCtx, MetadataProvider md) throws AlgebricksException {
+ ExternalDetailsDecl edd = copyFromStatement.getExternalDetails();
+ Map<String, String> properties = copyFromStatement.getExternalDetails().getProperties();
+ String path = copyFromStatement.getPath();
+ String pathKey = ExternalDataUtils.getPathKey(edd.getAdapter());
+ properties.put(pathKey, path);
+ return properties;
+ }
+
+ protected void handleCopyFromStatement(MetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc) throws Exception {
+ CopyFromStatement copyStmt = (CopyFromStatement) stmt;
+ String datasetName = copyStmt.getDatasetName();
+ metadataProvider.validateDatabaseObjectName(copyStmt.getNamespace(), datasetName, copyStmt.getSourceLocation());
+ Namespace stmtActiveNamespace = getActiveNamespace(copyStmt.getNamespace());
+ DataverseName dataverseName = stmtActiveNamespace.getDataverseName();
+ String databaseName = stmtActiveNamespace.getDatabaseName();
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), databaseName, dataverseName,
+ datasetName);
+ JobId jobId = null;
+ boolean atomic = false;
+ try {
+ metadataProvider.setWriteTransaction(true);
+ Dataset dataset = metadataProvider.findDataset(databaseName, dataverseName, copyStmt.getDatasetName());
+ if (dataset == null) {
+ throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, stmt.getSourceLocation(),
+ datasetName,
+ MetadataUtil.dataverseName(databaseName, dataverseName, metadataProvider.isUsingDatabase()));
+ }
+ Datatype itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDatabaseName(),
+ dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+ // Copy statement with csv files will have a type expression
+ if (copyStmt.getTypeExpr() != null) {
+ TypeExpression itemTypeExpr = copyStmt.getTypeExpr();
+ Triple<Namespace, String, Boolean> itemTypeQualifiedName = extractDatasetItemTypeName(
+ stmtActiveNamespace, datasetName, itemTypeExpr, false, stmt.getSourceLocation());
+ Namespace itemTypeNamespace = itemTypeQualifiedName.first;
+ DataverseName itemTypeDataverseName = itemTypeNamespace.getDataverseName();
+ String itemTypeName = itemTypeQualifiedName.second;
+ String itemTypeDatabaseName = itemTypeNamespace.getDatabaseName();
+ IAType itemTypeEntity = translateType(itemTypeDatabaseName, itemTypeDataverseName, itemTypeName,
+ itemTypeExpr, mdTxnCtx);
+ itemType =
+ new Datatype(itemTypeDatabaseName, itemTypeDataverseName, itemTypeName, itemTypeEntity, true);
+ }
+ ExternalDetailsDecl externalDetails = copyStmt.getExternalDetails();
+ Map<String, String> properties = createExternalDataPropertiesForCopyFromStmt(databaseName, dataverseName,
+ copyStmt, itemType, mdTxnCtx, metadataProvider);
+ ExternalDataUtils.normalize(properties);
+ ExternalDataUtils.validate(properties);
+ validateExternalDatasetProperties(externalDetails, properties, copyStmt.getSourceLocation(), mdTxnCtx,
+ appCtx);
+ CompiledCopyFromFileStatement cls = new CompiledCopyFromFileStatement(databaseName, dataverseName,
+ copyStmt.getDatasetName(), itemType, externalDetails.getAdapter(), properties);
+ cls.setSourceLocation(stmt.getSourceLocation());
+ JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
- null, responsePrinter, warningCollector, null);
++ null, responsePrinter, warningCollector, null, jobFlags);
+ afterCompile();
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ if (spec != null && !isCompileOnly()) {
+ atomic = dataset.isAtomic();
+ if (atomic) {
+ int numParticipatingNodes = appCtx.getNodeJobTracker()
+ .getJobParticipatingNodes(spec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
+ .size();
+ int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(spec,
+ LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
+ List<Integer> participatingDatasetIds = new ArrayList<>();
+ participatingDatasetIds.add(dataset.getDatasetId());
+ spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds,
+ numParticipatingNodes, numParticipatingPartitions));
+ }
+ jobId = JobUtils.runJob(hcc, spec, jobFlags, false);
+
+ String nameBefore = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
+ hcc.waitForCompletion(jobId);
+ } finally {
+ Thread.currentThread().setName(nameBefore);
+ }
+ if (atomic) {
+ globalTxManager.commitTransaction(jobId);
+ }
+ }
+ } catch (Exception e) {
+ if (atomic && jobId != null) {
+ globalTxManager.abortTransaction(jobId);
+ }
+ if (bActiveTxn) {
+ abort(e, e, mdTxnCtx);
+ }
+ throw e;
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
+ }
+
+ protected void handleCopyToStatement(MetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
+ ResultMetadata outMetadata, IRequestParameters requestParameters, Map<String, IAObject> stmtParams,
+ Stats stats) throws Exception {
+ CopyToStatement copyTo = (CopyToStatement) stmt;
+ final IRequestTracker requestTracker = appCtx.getRequestTracker();
+ final ClientRequest clientRequest =
+ (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
+ final IMetadataLocker locker = new IMetadataLocker() {
+ @Override
+ public void lock() throws RuntimeDataException, InterruptedException {
+ try {
+ compilationLock.readLock().lockInterruptibly();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ ensureNotCancelled(clientRequest);
+ throw e;
+ }
+ }
+
+ @Override
+ public void unlock() {
+ metadataProvider.getLocks().unlock();
+ compilationLock.readLock().unlock();
+ }
+ };
+ final IStatementCompiler compiler = () -> {
+ long compileStart = System.nanoTime();
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ ExternalDetailsDecl edd = copyTo.getExternalDetailsDecl();
+ edd.setProperties(createAndValidateAdapterConfigurationForCopyToStmt(edd,
+ ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS, copyTo.getSourceLocation(), mdTxnCtx,
+ metadataProvider));
+
+ Map<VarIdentifier, IAObject> externalVars = createExternalVariables(copyTo, stmtParams);
+ // Query Rewriting (happens under the same ongoing metadata transaction)
+ LangRewritingContext langRewritingContext = createLangRewritingContext(metadataProvider,
+ declaredFunctions, null, warningCollector, copyTo.getVarCounter());
+ Pair<IReturningStatement, Integer> rewrittenResult = apiFramework.reWriteQuery(langRewritingContext,
+ copyTo, sessionOutput, true, true, externalVars.keySet());
+
+ CompiledStatements.CompiledCopyToStatement compiledCopyToStatement =
+ new CompiledStatements.CompiledCopyToStatement(copyTo);
+
+ // Query Compilation (happens under the same ongoing metadata transaction)
+ final JobSpecification jobSpec = apiFramework.compileQuery(hcc, metadataProvider, copyTo.getQuery(),
+ rewrittenResult.second, null, sessionOutput, compiledCopyToStatement, externalVars,
- responsePrinter, warningCollector, requestParameters);
++ responsePrinter, warningCollector, requestParameters, jobFlags);
+ // update stats with count of compile-time warnings. needs to be adapted for multi-statement.
+ stats.updateTotalWarningsCount(warningCollector.getTotalWarningsCount());
+ afterCompile();
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ stats.setCompileTime(System.nanoTime() - compileStart);
+ bActiveTxn = false;
+ return isCompileOnly() ? null : jobSpec;
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, e.getMessage(), e);
+ if (bActiveTxn) {
+ abort(e, e, mdTxnCtx);
+ }
+ throw e;
+ }
+ };
+
+ deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
+ requestParameters, true, null);
+ }
+
public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters,
diff --cc asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
index f712bdd61e,f9d75b510c..6c7f2f0d63
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
@@@ -85,12 -85,30 +87,12 @@@ public final class DatasetStreamStatsOp
writer.open();
IStatsCollector coll = ctx.getStatsCollector();
if (coll != null) {
- coll.add(new OperatorStats(operatorName));
+ coll.add(new OperatorStats(operatorName, INVALID_ODID));
}
INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
- indexStats = new HashMap<>();
- for (int i = 0; i < indexes.length; i++) {
- IIndexDataflowHelper idxFlowHelper = indexes[i].create(serviceCtx, partition);
- try {
- idxFlowHelper.open();
- ILSMIndex indexInstance = (ILSMIndex) idxFlowHelper.getIndexInstance();
- long numPages = 0;
- synchronized (indexInstance.getOperationTracker()) {
- for (ILSMDiskComponent component : indexInstance.getDiskComponents()) {
- long componentSize = component.getComponentSize();
- if (component instanceof AbstractLSMWithBloomFilterDiskComponent) {
- componentSize -= ((AbstractLSMWithBloomFilterDiskComponent) component)
- .getBloomFilter().getFileReference().getFile().length();
- }
- numPages += componentSize / indexInstance.getBufferCache().getPageSize();
- }
- }
- indexStats.put(indexesNames[i], new IndexStats(indexesNames[i], numPages));
- } finally {
- idxFlowHelper.close();
- }
+ indexesStats = new HashMap<>();
+ if (indexes.length > 0) {
+ gatherIndexesStats(serviceCtx, partitionsMap[partition]);
}
}
diff --cc hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 3e4e09f74d,5c532d3665..d1a356cb45
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@@ -80,7 -82,8 +81,7 @@@ import org.apache.hyracks.algebricks.co
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
- import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+ import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.exceptions.ErrorCode;
import com.fasterxml.jackson.core.JsonFactory;