You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ti...@apache.org on 2018/01/29 23:36:41 UTC

asterixdb git commit: [NO ISSUE] Readability improvements

Repository: asterixdb
Updated Branches:
  refs/heads/master 06f16caba -> f12534862


[NO ISSUE] Readability improvements

Change-Id: I8b27805be1668fe6591c442fcd44020a418c2931
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2333
Reviewed-by: Murtadha Hubail <mh...@apache.org>
Integration-Tests: Murtadha Hubail <mh...@apache.org>
Tested-by: Murtadha Hubail <mh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/f1253486
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/f1253486
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/f1253486

Branch: refs/heads/master
Commit: f12534862478e2952fde8f4d280c13e872cf5b89
Parents: 06f16ca
Author: Till Westmann <ti...@apache.org>
Authored: Sun Jan 28 21:25:03 2018 -0800
Committer: Till Westmann <ti...@apache.org>
Committed: Mon Jan 29 15:36:05 2018 -0800

----------------------------------------------------------------------
 .../apache/asterix/api/common/APIFramework.java | 135 ++++++++++---------
 .../org/apache/asterix/utils/ResourceUtils.java |  20 +--
 .../metadata/declared/MetadataProvider.java     |  16 +--
 3 files changed, 86 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1253486/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 0f91275..a18277e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -20,7 +20,6 @@ package org.apache.asterix.api.common;
 
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -95,6 +94,7 @@ import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
 import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext;
 import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
 import org.apache.hyracks.api.client.IClusterInfoCollector;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.client.NodeControllerInfo;
@@ -106,6 +106,7 @@ import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.apache.hyracks.control.common.config.OptionTypes;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.collect.ImmutableSet;
 
 /**
@@ -201,63 +202,48 @@ public class APIFramework {
     }
 
     public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
-            Query rwQ, int varCounter, String outputDatasetName, SessionOutput output, ICompiledDmlStatement statement)
-            throws AlgebricksException, RemoteException, ACIDException {
+            Query query, int varCounter, String outputDatasetName, SessionOutput output,
+            ICompiledDmlStatement statement) throws AlgebricksException, ACIDException {
+
+        // establish facts
+        final boolean isQuery = query != null;
+        final boolean isLoad = statement != null && statement.getKind() == Statement.Kind.LOAD;
 
         SessionConfig conf = output.config();
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) {
             output.out().println();
 
             printPlanPrefix(output, "Rewritten expression tree");
-            if (rwQ != null) {
-                rwQ.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
+            if (isQuery) {
+                query.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
             }
             printPlanPostfix(output);
         }
 
-        TxnId txnId = TxnIdFactory.create();
+        final TxnId txnId = TxnIdFactory.create();
         metadataProvider.setTxnId(txnId);
         ILangExpressionToPlanTranslator t =
                 translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter);
 
-        ILogicalPlan plan;
-        // statement = null when it's a query
-        if (statement == null || statement.getKind() != Statement.Kind.LOAD) {
-            plan = t.translate(rwQ, outputDatasetName, statement);
-        } else {
-            plan = t.translateLoad(statement);
-        }
+        ILogicalPlan plan = isLoad ? t.translateLoad(statement) : t.translate(query, outputDatasetName, statement);
 
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
             output.out().println();
 
             printPlanPrefix(output, "Logical plan");
-            if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
+            if (isQuery || isLoad) {
                 PlanPrettyPrinter.printPlan(plan, getPrettyPrintVisitor(output.config().getLpfmt(), output.out()), 0);
             }
             printPlanPostfix(output);
         }
         CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties();
-        int frameSize = compilerProperties.getFrameSize();
-        Map<String, String> querySpecificConfig = metadataProvider.getConfig();
-        validateConfig(querySpecificConfig); // Validates the user-overridden query parameters.
-        int sortFrameLimit = getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY,
-                querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
-                compilerProperties.getSortMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_SORT);
-        int groupFrameLimit = getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY,
-                querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
-                compilerProperties.getGroupMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_GROUP_BY);
-        int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
-                querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
-                compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN);
-        OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
-        OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalSort(sortFrameLimit);
-        OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalGroupBy(groupFrameLimit);
-        OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesForJoin(joinFrameLimit);
+        Map<String, String> querySpecificConfig = validateConfig(metadataProvider.getConfig());
+        final PhysicalOptimizationConfig physOptConf =
+                getPhysicalOptimizationConfig(compilerProperties, querySpecificConfig);
 
         HeuristicCompilerFactoryBuilder builder =
                 new HeuristicCompilerFactoryBuilder(OptimizationContextFactory.INSTANCE);
-        builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
+        builder.setPhysicalOptimizationConfig(physOptConf);
         builder.setLogicalRewrites(ruleSetFactory.getLogicalRewrites(metadataProvider.getApplicationContext()));
         builder.setPhysicalRewrites(ruleSetFactory.getPhysicalRewrites(metadataProvider.getApplicationContext()));
         IDataFormat format = metadataProvider.getDataFormat();
@@ -285,7 +271,7 @@ public class APIFramework {
                     PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0);
                 } else {
                     printPlanPrefix(output, "Optimized logical plan");
-                    if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
+                    if (isQuery || isLoad) {
                         PlanPrettyPrinter.printPlan(plan,
                                 getPrettyPrintVisitor(output.config().getLpfmt(), output.out()), 0);
                     }
@@ -293,7 +279,7 @@ public class APIFramework {
                 }
             }
         }
-        if (rwQ != null && rwQ.isExplain()) {
+        if (isQuery && query.isExplain()) {
             try {
                 LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
                 PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
@@ -318,25 +304,7 @@ public class APIFramework {
         builder.setHashFunctionFamilyProvider(format.getBinaryHashFunctionFamilyProvider());
         builder.setMissingWriterFactory(format.getMissingWriterFactory());
         builder.setPredicateEvaluatorFactoryProvider(format.getPredicateEvaluatorFactoryProvider());
-
-        final SessionConfig.OutputFormat outputFormat = conf.fmt();
-        switch (outputFormat) {
-            case LOSSLESS_JSON:
-                builder.setPrinterProvider(format.getLosslessJSONPrinterFactoryProvider());
-                break;
-            case CSV:
-                builder.setPrinterProvider(format.getCSVPrinterFactoryProvider());
-                break;
-            case ADM:
-                builder.setPrinterProvider(format.getADMPrinterFactoryProvider());
-                break;
-            case CLEAN_JSON:
-                builder.setPrinterProvider(format.getCleanJSONPrinterFactoryProvider());
-                break;
-            default:
-                throw new AlgebricksException("Unexpected OutputFormat: " + outputFormat);
-        }
-
+        builder.setPrinterProvider(getPrinterFactoryProvider(format, conf.fmt()));
         builder.setSerializerDeserializerProvider(format.getSerdeProvider());
         builder.setTypeTraitProvider(format.getTypeTraitProvider());
         builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
@@ -345,24 +313,66 @@ public class APIFramework {
                 new JobEventListenerFactory(txnId, metadataProvider.isWriteTransaction());
         JobSpecification spec = compiler.createJob(metadataProvider.getApplicationContext(), jobEventListenerFactory);
 
-        // When the top-level statement is a query, the statement parameter is null.
-        if (statement == null) {
+        if (isQuery) {
             // Sets a required capacity, only for read-only queries.
             // DDLs and DMLs are considered not that frequent.
             // limit the computation locations to the locations that will be used in the query
-            final AlgebricksAbsolutePartitionConstraint jobLocations = getJobLocations(spec,
-                    metadataProvider.getApplicationContext().getNodeJobTracker(), computationLocations);
-            final IClusterCapacity jobRequiredCapacity = ResourceUtils.getRequiredCapacity(plan, jobLocations,
-                    sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize);
+            final INodeJobTracker nodeJobTracker = metadataProvider.getApplicationContext().getNodeJobTracker();
+            final AlgebricksAbsolutePartitionConstraint jobLocations =
+                    getJobLocations(spec, nodeJobTracker, computationLocations);
+            final IClusterCapacity jobRequiredCapacity =
+                    ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf);
             spec.setRequiredClusterCapacity(jobRequiredCapacity);
         }
 
+        printJobSpec(query, spec, conf, output);
+        return spec;
+    }
+
+    protected PhysicalOptimizationConfig getPhysicalOptimizationConfig(CompilerProperties compilerProperties,
+            Map<String, String> querySpecificConfig) throws AlgebricksException {
+        int frameSize = compilerProperties.getFrameSize();
+        int sortFrameLimit = getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY,
+                querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
+                compilerProperties.getSortMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_SORT);
+        int groupFrameLimit = getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY,
+                querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
+                compilerProperties.getGroupMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_GROUP_BY);
+        int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
+                querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
+                compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN);
+        final PhysicalOptimizationConfig physOptConf = OptimizationConfUtil.getPhysicalOptimizationConfig();
+        physOptConf.setFrameSize(frameSize);
+        physOptConf.setMaxFramesExternalSort(sortFrameLimit);
+        physOptConf.setMaxFramesExternalGroupBy(groupFrameLimit);
+        physOptConf.setMaxFramesForJoin(joinFrameLimit);
+        return physOptConf;
+    }
+
+    protected IPrinterFactoryProvider getPrinterFactoryProvider(IDataFormat format,
+            SessionConfig.OutputFormat outputFormat) throws AlgebricksException {
+        switch (outputFormat) {
+            case LOSSLESS_JSON:
+                return format.getLosslessJSONPrinterFactoryProvider();
+            case CSV:
+                return format.getCSVPrinterFactoryProvider();
+            case ADM:
+                return format.getADMPrinterFactoryProvider();
+            case CLEAN_JSON:
+                return format.getCleanJSONPrinterFactoryProvider();
+            default:
+                throw new AlgebricksException("Unexpected OutputFormat: " + outputFormat);
+        }
+    }
+
+    protected void printJobSpec(Query rwQ, JobSpecification spec, SessionConfig conf, SessionOutput output)
+            throws AlgebricksException {
         if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
             printPlanPrefix(output, "Hyracks job");
             if (rwQ != null) {
                 try {
-                    output.out().println(
-                            new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(spec.toJSON()));
+                    final ObjectWriter objectWriter = new ObjectMapper().writerWithDefaultPrettyPrinter();
+                    output.out().println(objectWriter.writeValueAsString(spec.toJSON()));
                 } catch (IOException e) {
                     throw new AlgebricksException(e);
                 }
@@ -370,7 +380,6 @@ public class APIFramework {
             }
             printPlanPostfix(output);
         }
-        return spec;
     }
 
     private AbstractLogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor(SessionConfig.PlanFormat planFormat,
@@ -390,7 +399,6 @@ public class APIFramework {
             double duration = (endTime - startTime) / 1000.00;
             out.println("<pre>Duration: " + duration + " sec</pre>");
         }
-
     }
 
     public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out) throws Exception {
@@ -499,12 +507,13 @@ public class APIFramework {
     }
 
     // Validates if the query contains unsupported query parameters.
-    private static void validateConfig(Map<String, String> config) throws AlgebricksException {
+    private static Map<String, String> validateConfig(Map<String, String> config) throws AlgebricksException {
         for (String parameterName : config.keySet()) {
             if (!CONFIGURABLE_PARAMETER_NAMES.contains(parameterName)) {
                 throw AsterixException.create(ErrorCode.COMPILATION_UNSUPPORTED_QUERY_PARAMETER, parameterName);
             }
         }
+        return config;
     }
 
     public static AlgebricksAbsolutePartitionConstraint getJobLocations(JobSpecification spec,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1253486/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index 89c4c76..ccda1e7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartit
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.api.job.resource.ClusterCapacity;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
 
@@ -40,21 +41,20 @@ public class ResourceUtils {
      *            a given query plan.
      * @param computationLocations,
      *            the partitions for computation.
-     * @param sortFrameLimit,
-     *            the frame limit for one sorter partition.
-     * @param groupFrameLimit,
-     *            the frame limit for one group-by partition.
-     * @param joinFrameLimit
-     *            the frame limit for one joiner partition.
-     * @param frameSize
-     *            the frame size used in query execution.
+     * @param physicalOptimizationConfig,
+     *            a PhysicalOptimizationConfig.
      * @return the required cluster capacity for executing the query.
      * @throws AlgebricksException
      *             if the query plan is malformed.
      */
     public static IClusterCapacity getRequiredCapacity(ILogicalPlan plan,
-            AlgebricksAbsolutePartitionConstraint computationLocations, int sortFrameLimit, int groupFrameLimit,
-            int joinFrameLimit, int frameSize) throws AlgebricksException {
+            AlgebricksAbsolutePartitionConstraint computationLocations,
+            PhysicalOptimizationConfig physicalOptimizationConfig) throws AlgebricksException {
+        final int frameSize = physicalOptimizationConfig.getFrameSize();
+        final int sortFrameLimit = physicalOptimizationConfig.getMaxFramesExternalSort();
+        final int groupFrameLimit = physicalOptimizationConfig.getMaxFramesForGroupBy();
+        final int joinFrameLimit = physicalOptimizationConfig.getMaxFramesForJoin();
+
         // Creates a cluster capacity visitor.
         IClusterCapacity clusterCapacity = new ClusterCapacity();
         RequiredCapacityVisitor visitor = new RequiredCapacityVisitor(computationLocations.getLocations().length,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1253486/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 62337ad..f740d09 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -355,8 +355,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             throws AlgebricksException {
         DataSource source = findDataSource(dataSourceId);
         Dataset dataset = ((DatasetDataSource) source).getDataset();
-        String indexName = indexId;
-        Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+        Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexId);
         return (secondaryIndex != null)
                 ? new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this)
                 : null;
@@ -381,26 +380,19 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars,
             List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
             JobGenContext context, JobSpecification jobSpec, Object implConfig) throws AlgebricksException {
-        try {
-            return ((DataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables,
-                    projectVariables, projectPushed, minFilterVars, maxFilterVars, opSchema, typeEnv, context, jobSpec,
-                    implConfig);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
+        return ((DataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables, projectVariables,
+                projectPushed, minFilterVars, maxFilterVars, opSchema, typeEnv, context, jobSpec, implConfig);
     }
 
     protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
             JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc)
             throws AlgebricksException {
         ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
-        AlgebricksPartitionConstraint constraint;
         try {
-            constraint = adapterFactory.getPartitionConstraint();
+            return new Pair<>(dataScanner, adapterFactory.getPartitionConstraint());
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
-        return new Pair<>(dataScanner, constraint);
     }
 
     public Dataverse findDataverse(String dataverseName) throws AlgebricksException {