You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/01/28 03:28:40 UTC

asterixdb git commit: [ASTERIXDB-2269][RT] Use Job Locations To Estimate Resources

Repository: asterixdb
Updated Branches:
  refs/heads/master 1b5033353 -> a05657664


[ASTERIXDB-2269][RT] Use Job Locations To Estimate Resources

- user model changes: no
- storage format changes: no
- interface changes: yes
    - INodeJobTracker (+) getJobParticipatingNodes

Details:
- Use job locations to calculate the job's required
  resources rather than all cluster locations.
- Add test case.

Change-Id: Iecd8e234aa52a9f324e64044e01477fb12dc14e6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2330
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/a0565766
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/a0565766
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/a0565766

Branch: refs/heads/master
Commit: a05657664cb34fc79f4b94757e38718a689806f7
Parents: 1b50333
Author: Murtadha Hubail <mh...@apache.org>
Authored: Sat Jan 27 05:58:54 2018 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Sat Jan 27 19:28:03 2018 -0800

----------------------------------------------------------------------
 .../apache/asterix/api/common/APIFramework.java | 21 +++++++++++++--
 .../org/apache/asterix/utils/ResourceUtils.java |  2 +-
 .../asterix/api/common/APIFrameworkTest.java    | 27 ++++++++++++++++++++
 .../asterix/common/api/INodeJobTracker.java     | 11 ++++++++
 .../runtime/job/listener/NodeJobTracker.java    | 14 ++++++----
 5 files changed, 67 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0565766/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 4428e05..b0edb3e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -32,6 +33,7 @@ import java.util.Set;
 import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
 import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
 import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.config.OptimizationConfUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -100,6 +102,7 @@ import org.apache.hyracks.api.config.IOptionType;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.apache.hyracks.control.common.config.OptionTypes;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -346,8 +349,14 @@ public class APIFramework {
         if (statement == null) {
             // Sets a required capacity, only for read-only queries.
             // DDLs and DMLs are considered not that frequent.
-            spec.setRequiredClusterCapacity(ResourceUtils.getRequiredCompacity(plan, computationLocations,
-                    sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize));
+            // limit the computation locations to the locations that will be used in the query
+            final AlgebricksAbsolutePartitionConstraint jobLocations =
+                    getJobLocations(spec, metadataProvider.getApplicationContext().getNodeJobTracker(),
+                            computationLocations);
+            final IClusterCapacity jobRequiredCapacity = ResourceUtils
+                    .getRequiredCapacity(plan, jobLocations, sortFrameLimit, groupFrameLimit, joinFrameLimit,
+                            frameSize);
+            spec.setRequiredClusterCapacity(jobRequiredCapacity);
         }
 
         if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
@@ -499,4 +508,12 @@ public class APIFramework {
             }
         }
     }
+
+    public static AlgebricksAbsolutePartitionConstraint getJobLocations(JobSpecification spec,
+            INodeJobTracker jobTracker, AlgebricksAbsolutePartitionConstraint clusterLocations) {
+        final Set<String> jobParticipatingNodes = jobTracker.getJobParticipatingNodes(spec);
+        return new AlgebricksAbsolutePartitionConstraint(
+                Arrays.stream(clusterLocations.getLocations()).filter(jobParticipatingNodes::contains)
+                        .toArray(String[]::new));
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0565766/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index 61c1dfe..1763a98 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -52,7 +52,7 @@ public class ResourceUtils {
      * @throws AlgebricksException
      *             if the query plan is malformed.
      */
-    public static IClusterCapacity getRequiredCompacity(ILogicalPlan plan,
+    public static IClusterCapacity getRequiredCapacity(ILogicalPlan plan,
             AlgebricksAbsolutePartitionConstraint computationLocations, int sortFrameLimit, int groupFrameLimit,
             int joinFrameLimit, int frameSize)
             throws AlgebricksException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0565766/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
index e041021..90fc646 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/APIFrameworkTest.java
@@ -29,12 +29,18 @@ import java.util.Map;
 
 import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.runtime.job.listener.NodeJobTracker;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IClusterInfoCollector;
 import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import org.apache.hyracks.api.job.JobSpecification;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import junit.extensions.PA;
 
@@ -173,4 +179,25 @@ public class APIFrameworkTest {
         Assert.assertTrue(loc.getLocations().length == 8);
     }
 
+    @Test
+    public void testJobLocations() {
+        final String nc1 = "nc1";
+        final String nc2 = "nc2";
+        final NodeJobTracker nodeJobTracker = new NodeJobTracker();
+        nodeJobTracker.notifyNodeJoin(nc1, null);
+        nodeJobTracker.notifyNodeJoin(nc2, null);
+
+        final JobSpecification jobSpec = new JobSpecification();
+        // add only nc1 to the job locations
+        final ConstantExpression nc1Location = new ConstantExpression(nc1);
+        final LValueConstraintExpression lValueMock = Mockito.mock(LValueConstraintExpression.class);
+        jobSpec.getUserConstraints().add(new Constraint(lValueMock, nc1Location));
+
+        final String[] clusterLocation = new String[] { nc1, nc2 };
+        final AlgebricksAbsolutePartitionConstraint jobLocations = APIFramework
+                .getJobLocations(jobSpec, nodeJobTracker, new AlgebricksAbsolutePartitionConstraint(clusterLocation));
+        // ensure nc2 wasn't included
+        Assert.assertEquals(1, jobLocations.getLocations().length);
+        Assert.assertEquals(nc1, jobLocations.getLocations()[0]);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0565766/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
index 4503620..9966c95 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
 
 public interface INodeJobTracker extends IJobLifecycleListener, IClusterLifecycleListener {
 
@@ -34,4 +35,14 @@ public interface INodeJobTracker extends IJobLifecycleListener, IClusterLifecycl
      * @return unmodifiable set of the node pending jobs.
      */
     Set<JobId> getPendingJobs(String nodeId);
+
+    /**
+     * Gets the set of nodes that will participate in the execution
+     * of the job. The nodes will include only nodes that are known
+     * to this {@link INodeJobTracker}
+     *
+     * @param spec
+     * @return The participating nodes in the job execution
+     */
+    Set<String> getJobParticipatingNodes(JobSpecification spec);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0565766/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index 75c5582..ff009dc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -45,11 +45,7 @@ public class NodeJobTracker implements INodeJobTracker {
 
     @Override
     public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) {
-        final Set<String> matchedNodes = spec.getUserConstraints().stream().map(Constraint::getRValue)
-                .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast)
-                .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
-                .collect(Collectors.toSet());
-        matchedNodes.stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
+        getJobParticipatingNodes(spec).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
     }
 
     @Override
@@ -78,4 +74,12 @@ public class NodeJobTracker implements INodeJobTracker {
                 Collections.unmodifiableSet(nodeJobs.get(nodeId)) :
                 Collections.emptySet();
     }
+
+    @Override
+    public Set<String> getJobParticipatingNodes(JobSpecification spec) {
+        return spec.getUserConstraints().stream().map(Constraint::getRValue)
+                .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast)
+                .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
+                .collect(Collectors.toSet());
+    }
 }
\ No newline at end of file