You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by xi...@apache.org on 2018/03/01 00:23:55 UTC

asterixdb git commit: [NO ISSUE][ING] Allow external UDF to use runtime parallelism

Repository: asterixdb
Updated Branches:
  refs/heads/master 0ef0e7648 -> a95a9a9e2


[NO ISSUE][ING] Allow external UDF to use runtime parallelism

- user model changes: no
- storage format changes: no
- interface changes:

Details:
1. Enable UDF in feed to use the runtime parallelism.
2. Fix the DefaultNodeDomain where the nodes should be mutliSet but not
list, for comparison purpose.

Change-Id: Ic3b54617be115f51b6a48b9a61581c26b5be8d9d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2398
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/a95a9a9e
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/a95a9a9e
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/a95a9a9e

Branch: refs/heads/master
Commit: a95a9a9e2eb098f9bbb0bfd8502d4ea51311fe05
Parents: 0ef0e76
Author: Xikui Wang <xk...@gmail.com>
Authored: Wed Feb 28 12:20:45 2018 -0800
Committer: Xikui Wang <xk...@gmail.com>
Committed: Wed Feb 28 16:23:38 2018 -0800

----------------------------------------------------------------------
 ...ceRandomPartitioningFeedComputationRule.java | 20 ++++++-------------
 .../asterix/app/function/FeedRewriter.java      |  8 ++++----
 .../results/query-ASTERIXDB-1343-2.plan         |  2 +-
 .../feed-with-external-function.3.update.sqlpp  |  2 ++
 .../metadata/declared/FeedDataSource.java       | 21 ++++++++++----------
 .../algebricks/algebricks-core/pom.xml          |  4 ++++
 .../operators/physical/AssignPOperator.java     | 16 +++++++--------
 .../properties/DefaultNodeGroupDomain.java      |  9 +++++++--
 8 files changed, 42 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a95a9a9e/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index dfb73ee..c41601b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -35,6 +35,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceSc
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -66,19 +67,9 @@ public class IntroduceRandomPartitioningFeedComputationRule implements IAlgebrai
         }
 
         ExchangeOperator exchangeOp = new ExchangeOperator();
-        INodeDomain domain = new INodeDomain() {
-            @Override
-            public boolean sameAs(INodeDomain domain) {
-                return domain == this;
-            }
+        INodeDomain runtimeDomain = feedDataSource.getComputationNodeDomain();
 
-            @Override
-            public Integer cardinality() {
-                return feedDataSource.getComputeCardinality();
-            }
-        };
-
-        exchangeOp.setPhysicalOperator(new RandomPartitionExchangePOperator(domain));
+        exchangeOp.setPhysicalOperator(new RandomPartitionExchangePOperator(runtimeDomain));
         op.getInputs().get(0).setValue(exchangeOp);
         exchangeOp.getInputs().add(new MutableObject<ILogicalOperator>(scanOp));
         ExecutionMode em = ((AbstractLogicalOperator) scanOp).getExecutionMode();
@@ -88,8 +79,9 @@ public class IntroduceRandomPartitioningFeedComputationRule implements IAlgebrai
 
         AssignOperator assignOp = (AssignOperator) opRef.getValue();
         AssignPOperator assignPhyOp = (AssignPOperator) assignOp.getPhysicalOperator();
-        assignPhyOp.setCardinalityConstraint(domain.cardinality());
-
+        DefaultNodeGroupDomain computationNode = (DefaultNodeGroupDomain) runtimeDomain;
+        String[] nodes = computationNode.getNodes();
+        assignPhyOp.setLocationConstraint(nodes);
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a95a9a9e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
index 05dd53e..51aca5d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
@@ -163,10 +163,10 @@ public class FeedRewriter implements IFunctionToDataSourceRewriter, IResultTypeC
         } else {
             keyAccessScalarFunctionCallExpression = null;
         }
-        FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) context.getMetadataProvider(), sourceFeed,
-                aqlId, targetDataset, feedOutputType, metaType, pkTypes, keyAccessScalarFunctionCallExpression,
-                sourceFeed.getFeedId(), FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
-                context.getComputationNodeDomain(), feedConnection);
+        FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, aqlId, targetDataset, feedOutputType, metaType,
+                pkTypes, keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(),
+                FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), context.getComputationNodeDomain(),
+                feedConnection);
         feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
         return feedDataSource;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a95a9a9e/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
index caa317d..5141e09 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
@@ -18,7 +18,7 @@
                   -- STREAM_PROJECT  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                       -- HYBRID_HASH_JOIN [$$30][$$31]  |PARTITIONED|
-                        -- HASH_PARTITION_EXCHANGE [$$30]  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- STREAM_SELECT  |PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a95a9a9e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
index 883cd7a..0d46387 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
@@ -23,6 +23,8 @@
  */
 use externallibtest;
 
+SET `compiler.parallelism` "5";
+
 connect feed TweetFeed to dataset TweetsFeedIngest apply function `testlib#parseTweet`;
 
 start feed TweetFeed;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a95a9a9e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 325d23b..5c3ed56 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -56,17 +56,16 @@ public class FeedDataSource extends DataSource implements IMutationDataSource {
     private final FeedRuntimeType location;
     private final String targetDataset;
     private final String[] locations;
-    private final int computeCardinality;
+    private final INodeDomain computationNodeDomain;
     private final List<IAType> pkTypes;
     private final List<ScalarFunctionCallExpression> keyAccessExpression;
     private final FeedConnection feedConnection;
 
-    public FeedDataSource(MetadataProvider metadataProvider, Feed feed, DataSourceId id, String targetDataset,
-            IAType itemType, IAType metaType, List<IAType> pkTypes,
-            List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId, FeedRuntimeType location,
-            String[] locations, INodeDomain domain, FeedConnection feedConnection) throws AlgebricksException {
+    public FeedDataSource(Feed feed, DataSourceId id, String targetDataset, IAType itemType, IAType metaType,
+            List<IAType> pkTypes, List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId,
+            FeedRuntimeType location, String[] locations, INodeDomain domain, FeedConnection feedConnection)
+            throws AlgebricksException {
         super(id, itemType, metaType, Type.FEED, domain);
-        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
         this.feed = feed;
         this.targetDataset = targetDataset;
         this.sourceFeedId = sourceFeedId;
@@ -74,7 +73,7 @@ public class FeedDataSource extends DataSource implements IMutationDataSource {
         this.locations = locations;
         this.pkTypes = pkTypes;
         this.keyAccessExpression = keyAccessExpression;
-        this.computeCardinality = appCtx.getClusterStateManager().getParticipantNodes().size();
+        this.computationNodeDomain = domain;
         this.feedConnection = feedConnection;
         initFeedDataSource();
     }
@@ -119,10 +118,6 @@ public class FeedDataSource extends DataSource implements IMutationDataSource {
         }
     }
 
-    public int getComputeCardinality() {
-        return computeCardinality;
-    }
-
     public List<IAType> getPkTypes() {
         return pkTypes;
     }
@@ -208,4 +203,8 @@ public class FeedDataSource extends DataSource implements IMutationDataSource {
     public FeedConnection getFeedConnection() {
         return feedConnection;
     }
+
+    public INodeDomain getComputationNodeDomain() {
+        return computationNodeDomain;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a95a9a9e/hyracks-fullstack/algebricks/algebricks-core/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
index 9f7d2bd..669988c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
@@ -85,5 +85,9 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-collections4</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a95a9a9e/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 995f6e0..ccd27f4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
 
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -44,7 +44,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 public class AssignPOperator extends AbstractPhysicalOperator {
 
     private boolean flushFramesRapidly;
-    private int cardinalityConstraint = 0;
+    private String[] locations;
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
@@ -93,10 +93,10 @@ public class AssignPOperator extends AbstractPhysicalOperator {
 
         // contribute one Asterix framewriter
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
-        if (cardinalityConstraint > 0) {
-            AlgebricksCountPartitionConstraint countConstraint =
-                    new AlgebricksCountPartitionConstraint(cardinalityConstraint);
-            builder.contributeMicroOperator(assign, runtime, recDesc, countConstraint);
+        if (locations != null && locations.length > 0) {
+            AlgebricksAbsolutePartitionConstraint locationConstraint =
+                    new AlgebricksAbsolutePartitionConstraint(locations);
+            builder.contributeMicroOperator(assign, runtime, recDesc, locationConstraint);
         } else {
             builder.contributeMicroOperator(assign, runtime, recDesc);
         }
@@ -115,8 +115,8 @@ public class AssignPOperator extends AbstractPhysicalOperator {
         this.flushFramesRapidly = flushFramesRapidly;
     }
 
-    public void setCardinalityConstraint(int cardinality) {
-        this.cardinalityConstraint = cardinality;
+    public void setLocationConstraint(String[] locations) {
+        this.locations = locations;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a95a9a9e/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
index 719c70e..a0ef64e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
@@ -18,16 +18,17 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.properties;
 
-import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.collections4.MultiSet;
+import org.apache.commons.collections4.multiset.HashMultiSet;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
 
 public class DefaultNodeGroupDomain implements INodeDomain {
 
-    private List<String> nodes = new ArrayList<>();
+    private MultiSet<String> nodes = new HashMultiSet<>();
 
     public DefaultNodeGroupDomain(List<String> nodes) {
         this.nodes.addAll(nodes);
@@ -67,4 +68,8 @@ public class DefaultNodeGroupDomain implements INodeDomain {
     public Integer cardinality() {
         return nodes.size();
     }
+
+    public String[] getNodes() {
+        return nodes.toArray(new String[0]);
+    }
 }