You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ti...@apache.org on 2019/02/05 23:31:06 UTC

[asterixdb] branch master updated: [NO ISSUE] Add internal function jobs() to retrieve job information

This is an automated email from the ASF dual-hosted git repository.

tillw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ff509cd  [NO ISSUE] Add internal function jobs() to retrieve job information
ff509cd is described below

commit ff509cdddeaedf0fc7f37d50f7ace16766e5fa40
Author: Till Westmann <ti...@couchbase.com>
AuthorDate: Sun Feb 3 16:00:03 2019 -0800

    [NO ISSUE] Add internal function jobs() to retrieve job information
    
    - user model changes: added function jobs()
    - storage format changes: no
    - interface changes: no
    
    Change-Id: I80d786899d6dd5970c8faf24041bc60da881a365
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3115
    Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../app/function/JobSummariesDatasource.java       | 43 +++++++++++++
 .../asterix/app/function/JobSummariesFunction.java | 68 +++++++++++++++++++++
 .../asterix/app/function/JobSummariesReader.java   | 47 +++++++++++++++
 .../asterix/app/function/JobSummariesRewriter.java | 43 +++++++++++++
 .../app/message/GetJobSummariesRequest.java        | 70 ++++++++++++++++++++++
 .../app/message/GetJobSummariesResponse.java       | 50 ++++++++++++++++
 .../asterix/util/MetadataBuiltinFunctions.java     |  6 ++
 .../queries_sqlpp/misc/jobs/jobs.1.query.sqlpp     | 24 ++++++++
 .../runtimets/results/misc/jobs/jobs.1.regex       |  1 +
 .../test/resources/runtimets/testsuite_sqlpp.xml   |  5 ++
 10 files changed, 357 insertions(+)

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java
new file mode 100644
index 0000000..b92dc55
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class JobSummariesDatasource extends FunctionDataSource {
+
+    private static final DataSourceId JOB_SUMMARIES_DATASOURCE_ID = new DataSourceId(
+            JobSummariesRewriter.JOBSUMMARIES.getNamespace(), JobSummariesRewriter.JOBSUMMARIES.getName());
+
+    public JobSummariesDatasource(INodeDomain domain) throws AlgebricksException {
+        super(JOB_SUMMARIES_DATASOURCE_ID, domain);
+    }
+
+    @Override
+    protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+            AlgebricksAbsolutePartitionConstraint locations) {
+        return new JobSummariesFunction(AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations()));
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java
new file mode 100644
index 0000000..82062e2
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.message.GetJobSummariesRequest;
+import org.apache.asterix.app.message.GetJobSummariesResponse;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class JobSummariesFunction extends AbstractDatasourceFunction {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private static final long serialVersionUID = 1L;
+
+    public JobSummariesFunction(AlgebricksAbsolutePartitionConstraint locations) {
+        super(locations);
+    }
+
+    @Override
+    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
+        INCMessageBroker messageBroker = (INCMessageBroker) serviceCtx.getMessageBroker();
+        MessageFuture messageFuture = messageBroker.registerMessageFuture();
+        final long futureId = messageFuture.getFutureId();
+        GetJobSummariesRequest request = new GetJobSummariesRequest(serviceCtx.getNodeId(), futureId);
+        try {
+            messageBroker.sendMessageToPrimaryCC(request);
+            GetJobSummariesResponse response =
+                    (GetJobSummariesResponse) messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+            return new JobSummariesReader(response.getSummaries());
+        } catch (Exception e) {
+            LOGGER.warn("Could no retrieve jobs info", e);
+            throw HyracksDataException.create(e);
+        } finally {
+            messageBroker.deregisterMessageFuture(futureId);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java
new file mode 100644
index 0000000..d5a3272
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+
+public class JobSummariesReader extends FunctionReader {
+
+    private final String[] summaries;
+    private int pos = 0;
+
+    public JobSummariesReader(String[] summaries) {
+        this.summaries = summaries;
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return pos < summaries.length;
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException, InterruptedException {
+        CharArrayRecord record = new CharArrayRecord();
+        record.set(summaries[pos++]);
+        record.endRecord();
+        return record;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java
new file mode 100644
index 0000000..ef753b4
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class JobSummariesRewriter extends FunctionRewriter {
+
+    public static final FunctionIdentifier JOBSUMMARIES =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "jobs", 0);
+    public static final JobSummariesRewriter INSTANCE = new JobSummariesRewriter(JOBSUMMARIES);
+
+    private JobSummariesRewriter(FunctionIdentifier functionId) {
+        super(functionId);
+    }
+
+    @Override
+    protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f)
+            throws AlgebricksException {
+        return new JobSummariesDatasource(context.getComputationNodeDomain());
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java
new file mode 100644
index 0000000..b885b13
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.work.GetJobSummariesJSONWork;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
+public class GetJobSummariesRequest implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final long reqId;
+
+    public GetJobSummariesRequest(String nodeId, long reqId) {
+        this.nodeId = nodeId;
+        this.reqId = reqId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
+        GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs.getJobManager());
+        try {
+            ccs.getWorkQueue().scheduleAndSync(gjse);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Failure getting jobs", e);
+            throw HyracksDataException.create(e);
+        }
+        final ArrayNode gjseSummaries = gjse.getSummaries();
+        final int size = gjseSummaries.size();
+        String[] summaries = new String[size];
+        for (int i = 0; i < size; ++i) {
+            summaries[i] = gjseSummaries.get(i).toString();
+        }
+        GetJobSummariesResponse response = new GetJobSummariesResponse(reqId, summaries);
+        CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        try {
+            messageBroker.sendApplicationMessageToNC(response, nodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Failure sending response to nc", e);
+            throw HyracksDataException.create(e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java
new file mode 100644
index 0000000..7554d53
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class GetJobSummariesResponse implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+    private final String[] summaries;
+
+    public GetJobSummariesResponse(long reqId, String[] summaries) {
+        this.reqId = reqId;
+        this.summaries = summaries;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        MessageFuture future = mb.deregisterMessageFuture(reqId);
+        if (future != null) {
+            future.complete(this);
+        }
+    }
+
+    public String[] getSummaries() {
+        return summaries;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
index 3407d59..a8314ec 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -22,6 +22,7 @@ import org.apache.asterix.app.function.ActiveRequestsRewriter;
 import org.apache.asterix.app.function.DatasetResourcesRewriter;
 import org.apache.asterix.app.function.DatasetRewriter;
 import org.apache.asterix.app.function.FeedRewriter;
+import org.apache.asterix.app.function.JobSummariesRewriter;
 import org.apache.asterix.app.function.PingRewriter;
 import org.apache.asterix.app.function.StorageComponentsRewriter;
 import org.apache.asterix.om.functions.BuiltinFunctions;
@@ -60,6 +61,11 @@ public class MetadataBuiltinFunctions {
                 (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
         BuiltinFunctions.addUnnestFun(ActiveRequestsRewriter.ACTIVE_REQUESTS, true);
         BuiltinFunctions.addDatasourceFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS, ActiveRequestsRewriter.INSTANCE);
+        // job-summaries function
+        BuiltinFunctions.addPrivateFunction(JobSummariesRewriter.JOBSUMMARIES,
+                (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
+        BuiltinFunctions.addUnnestFun(JobSummariesRewriter.JOBSUMMARIES, true);
+        BuiltinFunctions.addDatasourceFunction(JobSummariesRewriter.JOBSUMMARIES, JobSummariesRewriter.INSTANCE);
     }
 
     private MetadataBuiltinFunctions() {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/jobs/jobs.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/jobs/jobs.1.query.sqlpp
new file mode 100644
index 0000000..e8e898f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/jobs/jobs.1.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+SET `import-private-functions` "true";
+select value j
+from jobs() j
+order by j.`start-time` desc
+limit 1;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.1.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.1.regex
new file mode 100644
index 0000000..914f6cb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.1.regex
@@ -0,0 +1 @@
+/\{ "type": "job-summary", "job-id": "JID:.*", "create-time": \d*, "start-time": \d*, "end-time": \d*, "status": "RUNNING" \}/
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index ceee5f9..f753bd0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -4987,6 +4987,11 @@
         <output-dir compare="Text">active_requests</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="misc">
+      <compilation-unit name="jobs">
+        <output-dir compare="Text">jobs</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="index">
     <test-group name="index/validations">