You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2019/02/11 22:22:01 UTC

[asterixdb] branch master updated: [NO ISSUE][OTH] Add API To Ensure Request Requirements

This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 47d2f36  [NO ISSUE][OTH] Add API To Ensure Request Requirements
47d2f36 is described below

commit 47d2f36d6461820fffc46709f1cf2cc7865df844
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Tue Feb 12 01:16:37 2019 +0300

    [NO ISSUE][OTH] Add API To Ensure Request Requirements
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - Refactor IRequestParameters as ICommonRequestParameters
      and IRequestParameters to break cyclic dependencies.
    - Add new API to ensure request can be scheduled for execution.
    
    Change-Id: Ifb0513e0baf2b473006d4aa23040c86751fbb4fc
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3181
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Integration-Tests: Murtadha Hubail <mh...@apache.org>
    Tested-by: Murtadha Hubail <mh...@apache.org>
---
 .../apache/asterix/translator/ClientRequest.java   | 13 ++---
 .../asterix/translator/IRequestParameters.java     | 32 +----------
 .../apache/asterix/translator/Receptionist.java    | 14 +++--
 .../translator/SchedulableClientRequest.java       | 66 ++++++++++++++++++++++
 .../asterix/app/translator/QueryTranslator.java    | 21 ++++---
 .../http/servlet/QueryCancellationServletTest.java |  9 ++-
 .../common/api/ICommonRequestParameters.java}      | 44 ++-------------
 .../apache/asterix/common/api/IReceptionist.java   | 20 ++++---
 ...tionist.java => ISchedulableClientRequest.java} | 41 ++++++++------
 9 files changed, 141 insertions(+), 119 deletions(-)

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 014bf3c..fe6aeea 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,9 +18,7 @@
  */
 package org.apache.asterix.translator;
 
-import java.util.Map;
-
-import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.api.ICommonRequestParameters;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -35,11 +33,10 @@ public class ClientRequest extends BaseClientRequest {
     protected Thread executor;
     protected String clientContextId;
 
-    public ClientRequest(IRequestReference requestReference, String clientContextId, String statement,
-            Map<String, String> optionalParameters) {
-        super(requestReference);
-        this.clientContextId = clientContextId;
-        this.statement = statement;
+    public ClientRequest(ICommonRequestParameters requestParameters) {
+        super(requestParameters.getRequestReference());
+        this.clientContextId = requestParameters.getClientContextId();
+        this.statement = requestParameters.getStatement();
         this.executor = Thread.currentThread();
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 86ba301..6e41cd2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -20,12 +20,13 @@ package org.apache.asterix.translator;
 
 import java.util.Map;
 
+import org.apache.asterix.common.api.ICommonRequestParameters;
 import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.api.result.IResultSet;
 
-public interface IRequestParameters {
+public interface IRequestParameters extends ICommonRequestParameters {
 
     /**
      * @return A Resultset client object that is used to read the results.
@@ -50,36 +51,7 @@ public interface IRequestParameters {
     IStatementExecutor.ResultMetadata getOutMetadata();
 
     /**
-     * @return the client context id for the query
-     */
-    String getClientContextId();
-
-    /**
-     * @return Optional request parameters. Otherwise null.
-     */
-    Map<String, String> getOptionalParameters();
-
-    /**
      * @return Statement parameters
      */
     Map<String, IAObject> getStatementParameters();
-
-    /**
-     * @return true if the request accepts multiple statements. Otherwise, false.
-     */
-    boolean isMultiStatement();
-
-    /**
-     * Gets the statement the client provided with the request
-     *
-     * @return the request statement
-     */
-    String getStatement();
-
-    /**
-     * The request reference of this {@link IRequestParameters}
-     *
-     * @return the request reference
-     */
-    IRequestReference getRequestReference();
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index c174c02..52aab20 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -18,15 +18,15 @@
  */
 package org.apache.asterix.translator;
 
-import java.util.Map;
 import java.util.UUID;
 
 import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.ICommonRequestParameters;
 import org.apache.asterix.common.api.IReceptionist;
 import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.api.ISchedulableClientRequest;
 import org.apache.asterix.common.api.RequestReference;
 import org.apache.http.HttpHeaders;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.util.NetworkUtil;
 
@@ -48,8 +48,12 @@ public class Receptionist implements IReceptionist {
     }
 
     @Override
-    public IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement,
-            Map<String, String> optionalParameters) throws HyracksDataException {
-        return new ClientRequest(requestRef, clientContextId, statement, optionalParameters);
+    public IClientRequest requestReceived(ICommonRequestParameters requestParameters) {
+        return new ClientRequest(requestParameters);
+    }
+
+    @Override
+    public void ensureSchedulable(ISchedulableClientRequest schedulableRequest) {
+        // currently we don't have any restrictions
     }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java
new file mode 100644
index 0000000..ca04463
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.ICommonRequestParameters;
+import org.apache.asterix.common.api.ISchedulableClientRequest;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class SchedulableClientRequest implements ISchedulableClientRequest {
+
+    private final IClientRequest clientRequest;
+    private final JobSpecification jobSpec;
+    private final IMetadataProvider metadataProvider;
+    private final ICommonRequestParameters requestParameters;
+
+    private SchedulableClientRequest(IClientRequest clientRequest, ICommonRequestParameters requestParameters,
+            IMetadataProvider metadataProvider, JobSpecification jobSpec) {
+        this.clientRequest = clientRequest;
+        this.requestParameters = requestParameters;
+        this.metadataProvider = metadataProvider;
+        this.jobSpec = jobSpec;
+    }
+
+    public static SchedulableClientRequest of(IClientRequest clientRequest, ICommonRequestParameters requestParameters,
+            IMetadataProvider metadataProvider, JobSpecification jobSpec) {
+        return new SchedulableClientRequest(clientRequest, requestParameters, metadataProvider, jobSpec);
+    }
+
+    @Override
+    public IClientRequest getClientRequest() {
+        return clientRequest;
+    }
+
+    @Override
+    public ICommonRequestParameters getRequestParameters() {
+        return requestParameters;
+    }
+
+    @Override
+    public JobSpecification getJobSpecification() {
+        return jobSpec;
+    }
+
+    @Override
+    public IMetadataProvider getMetadataProvider() {
+        return metadataProvider;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f89b2db..c49fcdd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -163,6 +163,7 @@ import org.apache.asterix.translator.ExecutionPlans;
 import org.apache.asterix.translator.ExecutionPlansHtmlPrintUtil;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.SchedulableClientRequest;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.asterix.translator.TypeTranslator;
@@ -2499,7 +2500,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             case ASYNC:
                 MutableBoolean printed = new MutableBoolean(false);
                 executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery,
-                        requestParameters, cancellable, resultSetId, printed));
+                        requestParameters, cancellable, resultSetId, printed, metadataProvider));
                 synchronized (printed) {
                     while (!printed.booleanValue()) {
                         printed.wait();
@@ -2514,7 +2515,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     sessionOutput.release();
                     ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
                             metadataProvider.findOutputRecordType());
-                }, requestParameters, cancellable, appCtx);
+                }, requestParameters, cancellable, appCtx, metadataProvider);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
@@ -2524,7 +2525,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                         outMetadata.getResultSets()
                                 .add(Triple.of(id, resultSetId, metadataProvider.findOutputRecordType()));
                     }
-                }, requestParameters, cancellable, appCtx);
+                }, requestParameters, cancellable, appCtx, metadataProvider);
                 break;
             default:
                 break;
@@ -2552,7 +2553,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
 
     private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker,
             ResultDelivery resultDelivery, IRequestParameters requestParameters, boolean cancellable,
-            ResultSetId resultSetId, MutableBoolean printed) {
+            ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider) {
         Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
         try {
             createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> {
@@ -2563,7 +2564,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     printed.setTrue();
                     printed.notify();
                 }
-            }, requestParameters, cancellable, appCtx);
+            }, requestParameters, cancellable, appCtx, metadataProvider);
         } catch (Exception e) {
             if (Objects.equals(JobId.INVALID, jobId.getValue())) {
                 // compilation failed
@@ -2594,7 +2595,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
 
     private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
             IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
-            IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx) throws Exception {
+            IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx,
+            MetadataProvider metadataProvider) throws Exception {
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest =
                 (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -2607,6 +2609,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             if (cancellable) {
                 clientRequest.markCancellable();
             }
+            final SchedulableClientRequest schedulableRequest =
+                    SchedulableClientRequest.of(clientRequest, requestParameters, metadataProvider, jobSpec);
+            appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             clientRequest.setJobId(jobId);
             if (jId != null) {
@@ -2946,9 +2951,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
     }
 
     protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
-        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(
-                requestParameters.getRequestReference(), requestParameters.getClientContextId(),
-                requestParameters.getStatement(), requestParameters.getOptionalParameters());
+        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters);
         appCtx.getRequestTracker().track(clientRequest);
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 1a526b2..2eef241 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
 import org.apache.asterix.api.http.server.ServletConstants;
+import org.apache.asterix.app.translator.RequestParameters;
 import org.apache.asterix.common.api.RequestReference;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.runtime.utils.RequestTracker;
@@ -66,7 +67,9 @@ public class QueryCancellationServletTest {
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
 
         final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis());
-        ClientRequest request = new ClientRequest(requestReference, "1", "select 1;", new HashMap<>());
+        RequestParameters requestParameters =
+                new RequestParameters(requestReference, "select 1", null, null, null, null, "1", null, null, true);
+        ClientRequest request = new ClientRequest(requestParameters);
         request.setJobId(new JobId(1));
         request.markCancellable();
         tracker.track(request);
@@ -81,7 +84,9 @@ public class QueryCancellationServletTest {
 
         // Tests the case that the job cancellation hit some exception from Hyracks.
         final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis());
-        ClientRequest request2 = new ClientRequest(requestReference2, "2", "select 1;", new HashMap<>());
+        requestParameters =
+                new RequestParameters(requestReference2, "select 1", null, null, null, null, "2", null, null, true);
+        ClientRequest request2 = new ClientRequest(requestParameters);
         request2.setJobId(new JobId(2));
         request2.markCancellable();
         tracker.track(request2);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java
similarity index 57%
copy from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
copy to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java
index 86ba301..8daff1d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java
@@ -16,41 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.translator;
+package org.apache.asterix.common.api;
 
 import java.util.Map;
 
-import org.apache.asterix.common.api.IRequestReference;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.hyracks.api.result.IResultSet;
-
-public interface IRequestParameters {
-
-    /**
-     * @return A Resultset client object that is used to read the results.
-     */
-    IResultSet getResultSet();
+public interface ICommonRequestParameters {
 
     /**
-     * Gets the required result properties of the request.
+     * The request reference of this {@link ICommonRequestParameters}
      *
-     * @return the result properties
-     */
-    ResultProperties getResultProperties();
-
-    /**
-     * @return a reference to write the stats of executed queries
-     */
-    Stats getStats();
-
-    /**
-     * @return a reference to write the metadata of executed queries
+     * @return the request reference
      */
-    IStatementExecutor.ResultMetadata getOutMetadata();
+    IRequestReference getRequestReference();
 
     /**
-     * @return the client context id for the query
+     * @return the client context id for the request
      */
     String getClientContextId();
 
@@ -60,11 +40,6 @@ public interface IRequestParameters {
     Map<String, String> getOptionalParameters();
 
     /**
-     * @return Statement parameters
-     */
-    Map<String, IAObject> getStatementParameters();
-
-    /**
      * @return true if the request accepts multiple statements. Otherwise, false.
      */
     boolean isMultiStatement();
@@ -75,11 +50,4 @@ public interface IRequestParameters {
      * @return the request statement
      */
     String getStatement();
-
-    /**
-     * The request reference of this {@link IRequestParameters}
-     *
-     * @return the request reference
-     */
-    IRequestReference getRequestReference();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 51df306..95ed22e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.common.api;
 
-import java.util.Map;
-
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
 
@@ -36,13 +34,17 @@ public interface IReceptionist {
     /**
      * Generates a {@link IClientRequest} based on the requests parameters
      *
-     * @param requestRef
-     * @param clientContextId
-     * @param statement
-     * @param getOptionalParameters
-     * @return A client request
+     * @param requestParameters
+     * @return the client request
+     * @throws HyracksDataException
+     */
+    IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException;
+
+    /**
+     * Ensures a client's request can be executed before its job is started
+     *
+     * @param schedulableRequest
      * @throws HyracksDataException
      */
-    IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement,
-            Map<String, String> getOptionalParameters) throws HyracksDataException;
+    void ensureSchedulable(ISchedulableClientRequest schedulableRequest) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java
similarity index 52%
copy from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
copy to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java
index 51df306..7723550 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java
@@ -18,31 +18,36 @@
  */
 package org.apache.asterix.common.api;
 
-import java.util.Map;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.api.job.JobSpecification;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.http.api.IServletRequest;
+public interface ISchedulableClientRequest {
 
-public interface IReceptionist {
+    /**
+     * Gets the client request
+     *
+     * @return the client request
+     */
+    IClientRequest getClientRequest();
+
+    /**
+     * Gets the request common parameters
+     *
+     * @return the request common parameters
+     */
+    ICommonRequestParameters getRequestParameters();
 
     /**
-     * Generates a request reference based on {@code request}
+     * Gets the request's job specification
      *
-     * @param request
-     * @return a request reference representing the request
+     * @return
      */
-    IRequestReference welcome(IServletRequest request);
+    JobSpecification getJobSpecification();
 
     /**
-     * Generates a {@link IClientRequest} based on the requests parameters
+     * Gets the metadata provider used to execute this request
      *
-     * @param requestRef
-     * @param clientContextId
-     * @param statement
-     * @param getOptionalParameters
-     * @return A client request
-     * @throws HyracksDataException
+     * @return the metadata provider
      */
-    IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement,
-            Map<String, String> getOptionalParameters) throws HyracksDataException;
-}
+    IMetadataProvider getMetadataProvider();
+}
\ No newline at end of file