You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2017/11/15 01:03:05 UTC
[2/3] asterixdb git commit: [ASTERIXDB-1911][HYR, RT,
CLUS] Fixes and Improvements for Deployed Jobs
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
index e0c5279..84a754a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
@@ -23,19 +23,12 @@ import java.io.Serializable;
public final class ActivityClusterId implements Serializable {
private static final long serialVersionUID = 1L;
- private final JobId jobId;
-
private final int id;
- public ActivityClusterId(JobId jobId, int id) {
- this.jobId = jobId;
+ public ActivityClusterId(int id) {
this.id = id;
}
- public JobId getJobId() {
- return jobId;
- }
-
public int getId() {
return id;
}
@@ -45,7 +38,6 @@ public final class ActivityClusterId implements Serializable {
final int prime = 31;
int result = 1;
result = prime * result + id;
- result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
return result;
}
@@ -64,18 +56,11 @@ public final class ActivityClusterId implements Serializable {
if (id != other.id) {
return false;
}
- if (jobId == null) {
- if (other.jobId != null) {
- return false;
- }
- } else if (!jobId.equals(other.jobId)) {
- return false;
- }
return true;
}
@Override
public String toString() {
- return "ACID:" + jobId + ":" + id;
+ return "ACID:" + id;
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java
new file mode 100644
index 0000000..8cbfb1a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IWritable;
+
+public final class DeployedJobSpecId implements IWritable, Serializable {
+
+ public static final DeployedJobSpecId INVALID = new DeployedJobSpecId(-1l);
+
+ private static final long serialVersionUID = 1L;
+ private long id;
+
+ public static DeployedJobSpecId create(DataInput dis) throws IOException {
+ DeployedJobSpecId deployedJobSpecId = new DeployedJobSpecId();
+ deployedJobSpecId.readFields(dis);
+ return deployedJobSpecId;
+ }
+
+ private DeployedJobSpecId() {
+ }
+
+ public DeployedJobSpecId(long id) {
+ this.id = id;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof DeployedJobSpecId)) {
+ return false;
+ }
+ return ((DeployedJobSpecId) o).id == id;
+ }
+
+ @Override
+ public String toString() {
+ return "PDJID:" + id;
+ }
+
+ public static DeployedJobSpecId parse(String str) throws HyracksDataException {
+ if (str.startsWith("PDJID:")) {
+ return new DeployedJobSpecId(Long.parseLong(str.substring(4)));
+ }
+ throw HyracksDataException.create(ErrorCode.NOT_A_JOBID, str);
+ }
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeLong(id);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ id = input.readLong();
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java
new file mode 100644
index 0000000..24caa9b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class DeployedJobSpecIdFactory {
+ private final AtomicLong id = new AtomicLong(0);
+
+ public DeployedJobSpecId create() {
+ return new DeployedJobSpecId(id.getAndIncrement());
+ }
+
+ public long maxDeployedJobSpecId() {
+ return id.get();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
index 133e342..d23b944 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -25,7 +25,7 @@ import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksException;
public interface IActivityClusterGraphGeneratorFactory extends Serializable {
- public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
+ public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(
ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) throws HyracksException;
public JobSpecification getJobSpecification();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java
index d523ccc..bd2f189 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java
@@ -23,5 +23,10 @@ import java.io.Serializable;
import org.apache.hyracks.api.context.IHyracksJobletContext;
public interface IJobletEventListenerFactory extends Serializable {
- public IJobletEventListener createListener(IHyracksJobletContext ctx);
+ IJobletEventListener createListener(IHyracksJobletContext ctx);
+
+ IJobletEventListenerFactory copyFactory();
+
+ //Allows job parameters to change listener settings
+ void updateListenerJobParameters(JobParameterByteStore jobParameterByteStore);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
new file mode 100644
index 0000000..551b3d7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class JobParameterByteStore implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private Map<byte[], byte[]> runtimeValues;
+ private final byte[] empty = new byte[0];
+
+ public JobParameterByteStore() {
+ runtimeValues = new HashMap<>();
+ }
+
+ public Map<byte[], byte[]> getParameterMap() {
+ return runtimeValues;
+ }
+
+ public void setParameters(Map<byte[], byte[]> map) {
+ runtimeValues = map;
+ }
+
+ public byte[] getParameterValue(byte[] name, int start, int length) {
+ for (Entry<byte[], byte[]> entry : runtimeValues.entrySet()) {
+ byte[] key = entry.getKey();
+ if (key.length == length) {
+ boolean matched = true;
+ for (int j = 0; j < length; j++) {
+ if (key[j] != name[j + start]) {
+ matched = false;
+ break;
+ }
+ }
+ if (matched) {
+ return entry.getValue();
+ }
+ }
+ }
+ return empty;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 327c422..4e3c0f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -24,15 +24,15 @@ import java.util.logging.Logger;
import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.DeployedJobSpecIdFactory;
import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.job.JobInfo;
import org.apache.hyracks.control.cc.work.CancelJobWork;
import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
-import org.apache.hyracks.control.cc.work.DestroyJobWork;
-import org.apache.hyracks.control.cc.work.DistributeJobWork;
+import org.apache.hyracks.control.cc.work.DeployJobSpecWork;
import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import org.apache.hyracks.control.cc.work.GetJobInfoWork;
import org.apache.hyracks.control.cc.work.GetJobStatusWork;
@@ -42,6 +42,7 @@ import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
import org.apache.hyracks.control.cc.work.GetResultStatusWork;
import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
import org.apache.hyracks.control.cc.work.JobStartWork;
+import org.apache.hyracks.control.cc.work.UndeployJobSpecWork;
import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
import org.apache.hyracks.control.common.work.IPCResponder;
import org.apache.hyracks.ipc.api.IIPCHandle;
@@ -53,10 +54,12 @@ class ClientInterfaceIPCI implements IIPCI {
private static final Logger LOGGER = Logger.getLogger(ClientInterfaceIPCI.class.getName());
private final ClusterControllerService ccs;
private final JobIdFactory jobIdFactory;
+ private final DeployedJobSpecIdFactory deployedJobSpecIdFactory;
- ClientInterfaceIPCI(ClusterControllerService ccs, JobIdFactory jobIdFactory) {
+ ClientInterfaceIPCI(final ClusterControllerService ccs, final JobIdFactory jobIdFactory) {
this.ccs = ccs;
this.jobIdFactory = jobIdFactory;
+ this.deployedJobSpecIdFactory = ccs.getDeployedJobSpecIdFactory();
}
@Override
@@ -83,16 +86,17 @@ class ClientInterfaceIPCI implements IIPCI {
new IPCResponder<JobInfo>(handle, mid)));
break;
case DISTRIBUTE_JOB:
- HyracksClientInterfaceFunctions.DistributeJobFunction djf =
- (HyracksClientInterfaceFunctions.DistributeJobFunction) fn;
- ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory,
- new IPCResponder<JobId>(handle, mid)));
+ HyracksClientInterfaceFunctions.DeployJobSpecFunction djf =
+ (HyracksClientInterfaceFunctions.DeployJobSpecFunction) fn;
+ ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, djf.getACGGFBytes(),
+ deployedJobSpecIdFactory.create(), new IPCResponder<>(handle, mid)));
break;
case DESTROY_JOB:
- HyracksClientInterfaceFunctions.DestroyJobFunction dsjf =
- (HyracksClientInterfaceFunctions.DestroyJobFunction) fn;
+ HyracksClientInterfaceFunctions.UndeployJobSpecFunction dsjf =
+ (HyracksClientInterfaceFunctions.UndeployJobSpecFunction) fn;
ccs.getWorkQueue()
- .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new IPCResponder<JobId>(handle, mid)));
+ .schedule(new UndeployJobSpecWork(ccs, dsjf.getDeployedJobSpecId(),
+ new IPCResponder<>(handle, mid)));
break;
case CANCEL_JOB:
HyracksClientInterfaceFunctions.CancelJobFunction cjf =
@@ -103,8 +107,14 @@ class ClientInterfaceIPCI implements IIPCI {
case START_JOB:
HyracksClientInterfaceFunctions.StartJobFunction sjf =
(HyracksClientInterfaceFunctions.StartJobFunction) fn;
- ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), sjf.getACGGFBytes(),
- sjf.getJobFlags(), sjf.getJobId(), new IPCResponder<JobId>(handle, mid), jobIdFactory));
+ DeployedJobSpecId id = sjf.getDeployedJobSpecId();
+ byte[] acggfBytes = null;
+ if (id == null) {
+ //The job is new
+ acggfBytes = sjf.getACGGFBytes();
+ }
+ ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
+ jobIdFactory, sjf.getJobParameters(), new IPCResponder<>(handle, mid), id));
break;
case GET_DATASET_DIRECTORY_SERIVICE_INFO:
ccs.getWorkQueue().schedule(
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index af5c102..5a53fce 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -23,7 +23,7 @@ import java.util.logging.Logger;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
-import org.apache.hyracks.control.cc.work.DistributedJobFailureWork;
+import org.apache.hyracks.control.cc.work.DeployedJobFailureWork;
import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
@@ -76,13 +76,12 @@ class ClusterControllerIPCI implements IIPCI {
break;
case NOTIFY_JOBLET_CLEANUP:
CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
- ccs.getWorkQueue().schedule(new JobletCleanupNotificationWork(ccs, njcf.getJobId(),
- njcf.getNodeId()));
+ ccs.getWorkQueue().schedule(new JobletCleanupNotificationWork(ccs, njcf.getJobId(), njcf.getNodeId()));
break;
case NOTIFY_DEPLOY_BINARY:
CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
- ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs, ndbf.getDeploymentId(),
- ndbf.getNodeId(), ndbf.getDeploymentStatus()));
+ ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs, ndbf.getDeploymentId(), ndbf.getNodeId(),
+ ndbf.getDeploymentStatus()));
break;
case REPORT_PROFILE:
CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
@@ -90,49 +89,48 @@ class ClusterControllerIPCI implements IIPCI {
break;
case NOTIFY_TASK_COMPLETE:
CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
- ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs, ntcf.getJobId(),
- ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
+ ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs, ntcf.getJobId(), ntcf.getTaskId(),
+ ntcf.getNodeId(), ntcf.getStatistics()));
break;
case NOTIFY_TASK_FAILURE:
CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
- ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(),
- ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
+ ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(), ntff.getTaskId(),
+ ntff.getNodeId(), ntff.getExceptions()));
break;
- case DISTRIBUTED_JOB_FAILURE:
- CCNCFunctions.ReportDistributedJobFailureFunction rdjf =
- (CCNCFunctions.ReportDistributedJobFailureFunction) fn;
- ccs.getWorkQueue().schedule(new DistributedJobFailureWork(rdjf.getJobId(), rdjf.getNodeId()));
+ case DEPLOYED_JOB_FAILURE:
+ CCNCFunctions.ReportDeployedJobSpecFailureFunction rdjf =
+ (CCNCFunctions.ReportDeployedJobSpecFailureFunction) fn;
+ ccs.getWorkQueue()
+ .schedule(new DeployedJobFailureWork(rdjf.getDeployedJobSpecId(), rdjf.getNodeId()));
break;
case REGISTER_PARTITION_PROVIDER:
CCNCFunctions.RegisterPartitionProviderFunction rppf =
(CCNCFunctions.RegisterPartitionProviderFunction) fn;
- ccs.getWorkQueue().schedule(new RegisterPartitionAvailibilityWork(ccs,
- rppf.getPartitionDescriptor()));
+ ccs.getWorkQueue().schedule(new RegisterPartitionAvailibilityWork(ccs, rppf.getPartitionDescriptor()));
break;
case REGISTER_PARTITION_REQUEST:
CCNCFunctions.RegisterPartitionRequestFunction rprf =
(CCNCFunctions.RegisterPartitionRequestFunction) fn;
- ccs.getWorkQueue().schedule(new RegisterPartitionRequestWork(ccs,
- rprf.getPartitionRequest()));
+ ccs.getWorkQueue().schedule(new RegisterPartitionRequestWork(ccs, rprf.getPartitionRequest()));
break;
case REGISTER_RESULT_PARTITION_LOCATION:
CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
(CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
- ccs.getWorkQueue().schedule(new RegisterResultPartitionLocationWork(ccs,
- rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
- rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
+ ccs.getWorkQueue()
+ .schedule(new RegisterResultPartitionLocationWork(ccs, rrplf.getJobId(), rrplf.getResultSetId(),
+ rrplf.getOrderedResult(), rrplf.getEmptyResult(), rrplf.getPartition(),
+ rrplf.getNPartitions(), rrplf.getNetworkAddress()));
break;
case REPORT_RESULT_PARTITION_WRITE_COMPLETION:
CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrpwc =
(CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
- ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs,
- rrpwc.getJobId(), rrpwc.getResultSetId(), rrpwc.getPartition()));
+ ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs, rrpwc.getJobId(),
+ rrpwc.getResultSetId(), rrpwc.getPartition()));
break;
case SEND_APPLICATION_MESSAGE:
- CCNCFunctions.SendApplicationMessageFunction rsf =
- (CCNCFunctions.SendApplicationMessageFunction) fn;
- ccs.getWorkQueue().schedule(new ApplicationMessageWork(ccs, rsf.getMessage(),
- rsf.getDeploymentId(), rsf.getNodeId()));
+ CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+ ccs.getWorkQueue().schedule(
+ new ApplicationMessageWork(ccs, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId()));
break;
case GET_NODE_CONTROLLERS_INFO:
ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs.getNodeManager(),
@@ -150,18 +148,17 @@ class ClusterControllerIPCI implements IIPCI {
break;
case STATE_DUMP_RESPONSE:
CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn;
- ccs.getWorkQueue().schedule(new NotifyStateDumpResponse(ccs, dsrf.getNodeId(),
- dsrf.getStateDumpId(), dsrf.getState()));
+ ccs.getWorkQueue().schedule(
+ new NotifyStateDumpResponse(ccs, dsrf.getNodeId(), dsrf.getStateDumpId(), dsrf.getState()));
break;
case SHUTDOWN_RESPONSE:
CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn;
ccs.getWorkQueue().schedule(new NotifyShutdownWork(ccs, sdrf.getNodeId()));
break;
case THREAD_DUMP_RESPONSE:
- CCNCFunctions.ThreadDumpResponseFunction tdrf =
- (CCNCFunctions.ThreadDumpResponseFunction)fn;
- ccs.getWorkQueue().schedule(new NotifyThreadDumpResponse(ccs,
- tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
+ CCNCFunctions.ThreadDumpResponseFunction tdrf = (CCNCFunctions.ThreadDumpResponseFunction) fn;
+ ccs.getWorkQueue()
+ .schedule(new NotifyThreadDumpResponse(ccs, tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
break;
default:
LOGGER.warning("Unknown function: " + fn.getFunctionId());
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 7b99df2..713bddd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -50,7 +50,10 @@ import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.DeployedJobSpecIdFactory;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobIdFactory;
+import org.apache.hyracks.api.job.JobParameterByteStore;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
@@ -107,7 +110,9 @@ public class ClusterControllerService implements IControllerService {
private CCServiceContext serviceCtx;
- private final PreDistributedJobStore preDistributedJobStore = new PreDistributedJobStore();
+ private final DeployedJobSpecStore deployedJobSpecStore = new DeployedJobSpecStore();
+
+ private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap = new HashMap<>();
private final WorkQueue workQueue;
@@ -135,6 +140,8 @@ public class ClusterControllerService implements IControllerService {
private final JobIdFactory jobIdFactory;
+ private final DeployedJobSpecIdFactory deployedJobSpecIdFactory;
+
private IJobManager jobManager;
private ShutdownRun shutdownCallback;
@@ -164,8 +171,8 @@ public class ClusterControllerService implements IControllerService {
final ClusterTopology topology = computeClusterTopology(ccConfig);
ccContext = new ClusterControllerContext(topology);
sweeper = new DeadNodeSweeper();
- datasetDirectoryService = new DatasetDirectoryService(ccConfig.getResultTTL(),
- ccConfig.getResultSweepThreshold(), preDistributedJobStore);
+ datasetDirectoryService =
+ new DatasetDirectoryService(ccConfig.getResultTTL(), ccConfig.getResultSweepThreshold());
deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
@@ -175,6 +182,8 @@ public class ClusterControllerService implements IControllerService {
nodeManager = new NodeManager(this, ccConfig, resourceManager);
jobIdFactory = new JobIdFactory();
+
+ deployedJobSpecIdFactory = new DeployedJobSpecIdFactory();
}
private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
@@ -347,8 +356,21 @@ public class ClusterControllerService implements IControllerService {
return nodeManager;
}
- public PreDistributedJobStore getPreDistributedJobStore() throws HyracksException {
- return preDistributedJobStore;
+ public DeployedJobSpecStore getDeployedJobSpecStore() throws HyracksException {
+ return deployedJobSpecStore;
+ }
+
+ public void removeJobParameterByteStore(JobId jobId) throws HyracksException {
+ jobParameterByteStoreMap.remove(jobId);
+ }
+
+ public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException {
+ JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
+ if (jpbs == null) {
+ jpbs = new JobParameterByteStore();
+ jobParameterByteStoreMap.put(jobId, jpbs);
+ }
+ return jpbs;
}
public IResourceManager getResourceManager() {
@@ -397,6 +419,10 @@ public class ClusterControllerService implements IControllerService {
return jobIdFactory;
}
+ public DeployedJobSpecIdFactory getDeployedJobSpecIdFactory() {
+ return deployedJobSpecIdFactory;
+ }
+
private final class ClusterControllerContext implements ICCContext {
private final ClusterTopology topology;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
new file mode 100644
index 0000000..1a3051e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class DeployedJobSpecStore {
+
+ private final Map<DeployedJobSpecId, DeployedJobSpecDescriptor> deployedJobSpecDescriptorMap;
+
+ public DeployedJobSpecStore() {
+ deployedJobSpecDescriptorMap = new Hashtable<>();
+ }
+
+ public void addDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId,
+ ActivityClusterGraph activityClusterGraph,
+ JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints)
+ throws HyracksException {
+ if (deployedJobSpecDescriptorMap.get(deployedJobSpecId) != null) {
+ throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId);
+ }
+ DeployedJobSpecDescriptor descriptor =
+ new DeployedJobSpecDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints);
+ deployedJobSpecDescriptorMap.put(deployedJobSpecId, descriptor);
+ }
+
+ public void checkForExistingDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId) throws HyracksException {
+ if (deployedJobSpecDescriptorMap.get(deployedJobSpecId) != null) {
+ throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId);
+ }
+ }
+
+ public DeployedJobSpecDescriptor getDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId)
+ throws HyracksException {
+ DeployedJobSpecDescriptor descriptor = deployedJobSpecDescriptorMap.get(deployedJobSpecId);
+ if (descriptor == null) {
+ throw HyracksException.create(ErrorCode.ERROR_FINDING_DEPLOYED_JOB, deployedJobSpecId);
+ }
+ return descriptor;
+ }
+
+ public void removeDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId) throws HyracksException {
+ DeployedJobSpecDescriptor descriptor = deployedJobSpecDescriptorMap.get(deployedJobSpecId);
+ if (descriptor == null) {
+ throw HyracksException.create(ErrorCode.ERROR_FINDING_DEPLOYED_JOB, deployedJobSpecId);
+ }
+ deployedJobSpecDescriptorMap.remove(deployedJobSpecId);
+ }
+
+ public class DeployedJobSpecDescriptor {
+
+ private final ActivityClusterGraph activityClusterGraph;
+
+ private final JobSpecification jobSpecification;
+
+ private final Set<Constraint> activityClusterGraphConstraints;
+
+ private DeployedJobSpecDescriptor(ActivityClusterGraph activityClusterGraph,
+ JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) {
+ this.activityClusterGraph = activityClusterGraph;
+ this.jobSpecification = jobSpecification;
+ this.activityClusterGraphConstraints = activityClusterGraphConstraints;
+ }
+
+ public ActivityClusterGraph getActivityClusterGraph() {
+ return activityClusterGraph;
+ }
+
+ public JobSpecification getJobSpecification() {
+ return jobSpecification;
+ }
+
+ public Set<Constraint> getActivityClusterGraphConstraints() {
+ return activityClusterGraphConstraints;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
deleted file mode 100644
index 117621f..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc;
-
-import java.util.Hashtable;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.api.constraints.Constraint;
-import org.apache.hyracks.api.exceptions.ErrorCode;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class PreDistributedJobStore {
-
- private final Map<JobId, PreDistributedJobDescriptor> preDistributedJobDescriptorMap;
-
- public PreDistributedJobStore() {
- preDistributedJobDescriptorMap = new Hashtable<>();
- }
-
- public void addDistributedJobDescriptor(JobId jobId, ActivityClusterGraph activityClusterGraph,
- JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints)
- throws HyracksException {
- if (preDistributedJobDescriptorMap.get(jobId) != null) {
- throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
- }
- PreDistributedJobDescriptor descriptor =
- new PreDistributedJobDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints);
- preDistributedJobDescriptorMap.put(jobId, descriptor);
- }
-
- public void checkForExistingDistributedJobDescriptor(JobId jobId) throws HyracksException {
- if (preDistributedJobDescriptorMap.get(jobId) != null) {
- throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
- }
- }
-
- public PreDistributedJobDescriptor getDistributedJobDescriptor(JobId jobId) throws HyracksException {
- PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId);
- if (descriptor == null) {
- throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
- }
- return descriptor;
- }
-
- public boolean jobIsPredistributed(JobId jobId) {
- return preDistributedJobDescriptorMap.get(jobId) != null;
- }
-
- public void removeDistributedJobDescriptor(JobId jobId) throws HyracksException {
- PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId);
- if (descriptor == null) {
- throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
- }
- preDistributedJobDescriptorMap.remove(jobId);
- }
-
- public class PreDistributedJobDescriptor {
-
- private final ActivityClusterGraph activityClusterGraph;
-
- private final JobSpecification jobSpecification;
-
- private final Set<Constraint> activityClusterGraphConstraints;
-
- private PreDistributedJobDescriptor(ActivityClusterGraph activityClusterGraph,
- JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) {
- this.activityClusterGraph = activityClusterGraph;
- this.jobSpecification = jobSpecification;
- this.activityClusterGraphConstraints = activityClusterGraphConstraints;
- }
-
- public ActivityClusterGraph getActivityClusterGraph() {
- return activityClusterGraph;
- }
-
- public JobSpecification getJobSpecification() {
- return jobSpecification;
- }
-
- public Set<Constraint> getActivityClusterGraphConstraints() {
- return activityClusterGraphConstraints;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index ca1c91b..1cb07d0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -42,7 +42,6 @@ import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.control.cc.PreDistributedJobStore;
import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -63,14 +62,10 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
private final Map<JobId, JobResultInfo> jobResultLocations;
- private final PreDistributedJobStore preDistributedJobStore;
-
- public DatasetDirectoryService(long resultTTL, long resultSweepThreshold,
- PreDistributedJobStore preDistributedJobStore) {
+ public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
- this.preDistributedJobStore = preDistributedJobStore;
- jobResultLocations = new LinkedHashMap<>();
+ jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
}
@Override
@@ -186,9 +181,6 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
@Override
public synchronized long getResultTimestamp(JobId jobId) {
- if (preDistributedJobStore.jobIsPredistributed(jobId)) {
- return -1;
- }
return getState(jobId).getTimestamp();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 8a69a6f..0b69024 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -47,6 +47,7 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.ActivityCluster;
import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.partitions.PartitionId;
@@ -77,7 +78,7 @@ public class JobExecutor {
private final PartitionConstraintSolver solver;
- private final boolean predistributed;
+ private final DeployedJobSpecId deployedJobSpecId;
private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
@@ -88,10 +89,10 @@ public class JobExecutor {
private boolean cancelled = false;
public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints,
- boolean predistributed) {
+ DeployedJobSpecId deployedJobSpecId) {
this.ccs = ccs;
this.jobRun = jobRun;
- this.predistributed = predistributed;
+ this.deployedJobSpecId = deployedJobSpecId;
solver = new PartitionConstraintSolver();
partitionProducingTaskClusterMap = new HashMap<>();
inProgressTaskClusters = new HashSet<>();
@@ -99,8 +100,8 @@ public class JobExecutor {
random = new Random();
}
- public boolean isPredistributed() {
- return predistributed;
+ public boolean isDeployed() {
+ return deployedJobSpecId != null;
}
public JobRun getJobRun() {
@@ -502,7 +503,7 @@ public class JobExecutor {
new HashMap<>(jobRun.getConnectorPolicyMap());
INodeManager nodeManager = ccs.getNodeManager();
try {
- byte[] acgBytes = predistributed ? null : JavaSerializationUtils.serialize(acg);
+ byte[] acgBytes = isDeployed() ? null : JavaSerializationUtils.serialize(acg);
for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
String nodeId = entry.getKey();
final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
@@ -515,7 +516,8 @@ public class JobExecutor {
}
byte[] jagBytes = changed ? acgBytes : null;
node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
- connectorPolicies, jobRun.getFlags());
+ connectorPolicies, jobRun.getFlags(),
+ ccs.createOrGetJobParameterByteStore(jobId).getParameterMap(), deployedJobSpecId);
}
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index fa22dd3..26f8022 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -142,6 +142,7 @@ public class JobManager implements IJobManager {
@Override
public void prepareComplete(JobRun run, JobStatus status, List<Exception> exceptions) throws HyracksException {
+ ccs.removeJobParameterByteStore(run.getJobId());
checkJob(run);
if (status == JobStatus.FAILURE_BEFORE_EXECUTION) {
run.setPendingStatus(JobStatus.FAILURE, exceptions);
@@ -306,9 +307,7 @@ public class JobManager implements IJobManager {
CCServiceContext serviceCtx = ccs.getContext();
JobSpecification spec = run.getJobSpecification();
- if (!run.getExecutor().isPredistributed()) {
- serviceCtx.notifyJobCreation(jobId, spec);
- }
+ serviceCtx.notifyJobCreation(jobId, spec);
run.setStatus(JobStatus.RUNNING, null);
executeJobInternal(run);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index ef0bca2..58f44ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.ActivityCluster;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.ActivityClusterId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.JobFlag;
@@ -45,7 +46,7 @@ import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.PreDistributedJobStore.PreDistributedJobDescriptor;
+import org.apache.hyracks.control.cc.DeployedJobSpecStore.DeployedJobSpecDescriptor;
import org.apache.hyracks.control.cc.executor.ActivityPartitionDetails;
import org.apache.hyracks.control.cc.executor.JobExecutor;
import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
@@ -114,21 +115,23 @@ public class JobRun implements IJobStatusConditionVariable {
createTime = System.currentTimeMillis();
}
- //Run a Pre-distributed job by passing the JobId
+ //Run a deployed job spec
public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags,
- PreDistributedJobDescriptor distributedJobDescriptor)
+ DeployedJobSpecDescriptor deployedJobSpecDescriptor, Map<byte[], byte[]> jobParameters,
+ DeployedJobSpecId deployedJobSpecId)
throws HyracksException {
this(deploymentId, jobId, jobFlags,
- distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
- Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
- this.scheduler = new JobExecutor(ccs, this, constaints, true);
+ deployedJobSpecDescriptor.getJobSpecification(), deployedJobSpecDescriptor.getActivityClusterGraph());
+ ccs.createOrGetJobParameterByteStore(jobId).setParameters(jobParameters);
+ Set<Constraint> constaints = deployedJobSpecDescriptor.getActivityClusterGraphConstraints();
+ this.scheduler = new JobExecutor(ccs, this, constaints, deployedJobSpecId);
}
//Run a new job by creating an ActivityClusterGraph
public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags) {
this(deploymentId, jobId, jobFlags, acggf.getJobSpecification(), acgg.initialize());
- this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), false);
+ this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), null);
}
public DeploymentId getDeploymentId() {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
new file mode 100644
index 0000000..f7335a8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import java.util.EnumSet;
+
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DeployJobSpecWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+ private final byte[] acggfBytes;
+ private final DeployedJobSpecId deployedJobSpecId;
+ private final IResultCallback<DeployedJobSpecId> callback;
+
+ public DeployJobSpecWork(ClusterControllerService ccs, byte[] acggfBytes, DeployedJobSpecId deployedJobSpecId,
+ IResultCallback<DeployedJobSpecId> callback) {
+ this.deployedJobSpecId = deployedJobSpecId;
+ this.ccs = ccs;
+ this.acggfBytes = acggfBytes;
+ this.callback = callback;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ final CCServiceContext ccServiceCtx = ccs.getContext();
+ ccs.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(deployedJobSpecId);
+ IActivityClusterGraphGeneratorFactory acggf =
+ (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx);
+ IActivityClusterGraphGenerator acgg =
+ acggf.createActivityClusterGraphGenerator(ccServiceCtx, EnumSet.noneOf(JobFlag.class));
+ ActivityClusterGraph acg = acgg.initialize();
+ ccs.getDeployedJobSpecStore().addDeployedJobSpecDescriptor(deployedJobSpecId, acg,
+ acggf.getJobSpecification(),
+ acgg.getConstraints());
+
+ byte[] acgBytes = JavaSerializationUtils.serialize(acg);
+
+ INodeManager nodeManager = ccs.getNodeManager();
+ for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
+ node.getNodeController().deployJobSpec(deployedJobSpecId, acgBytes);
+ }
+ callback.setValue(deployedJobSpecId);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java
new file mode 100644
index 0000000..8afdf42
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DeployedJobFailureWork extends SynchronizableWork {
+ protected final DeployedJobSpecId deployedJobSpecId;
+ protected final String nodeId;
+
+ public DeployedJobFailureWork(DeployedJobSpecId deployedJobSpecId, String nodeId) {
+ this.deployedJobSpecId = deployedJobSpecId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public void doRun() throws HyracksException {
+ throw HyracksException.create(ErrorCode.DEPLOYED_JOB_FAILURE, deployedJobSpecId, nodeId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
deleted file mode 100644
index df98252..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.work;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.work.IResultCallback;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-
-public class DestroyJobWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
- private final JobId jobId;
- private final IResultCallback<JobId> callback;
-
- public DestroyJobWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobId> callback) {
- this.jobId = jobId;
- this.ccs = ccs;
- this.callback = callback;
- }
-
- @Override
- protected void doRun() throws Exception {
- try {
- ccs.getPreDistributedJobStore().removeDistributedJobDescriptor(jobId);
- INodeManager nodeManager = ccs.getNodeManager();
- for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
- node.getNodeController().destroyJob(jobId);
- }
- callback.setValue(jobId);
- } catch (Exception e) {
- callback.setException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
deleted file mode 100644
index 5a57b1b..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.work;
-
-import java.util.EnumSet;
-
-import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
-import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobIdFactory;
-import org.apache.hyracks.api.util.JavaSerializationUtils;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCServiceContext;
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.deployment.DeploymentUtils;
-import org.apache.hyracks.control.common.work.IResultCallback;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-
-public class DistributeJobWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
- private final byte[] acggfBytes;
- private final JobIdFactory jobIdFactory;
- private final IResultCallback<JobId> callback;
-
- public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobIdFactory jobIdFactory,
- IResultCallback<JobId> callback) {
- this.jobIdFactory = jobIdFactory;
- this.ccs = ccs;
- this.acggfBytes = acggfBytes;
- this.callback = callback;
- }
-
- @Override
- protected void doRun() throws Exception {
- try {
- JobId jobId = jobIdFactory.create();
- final CCServiceContext ccServiceCtx = ccs.getContext();
- ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId);
- IActivityClusterGraphGeneratorFactory acggf =
- (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx);
- IActivityClusterGraphGenerator acgg =
- acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, EnumSet.noneOf(JobFlag.class));
- ActivityClusterGraph acg = acgg.initialize();
- ccs.getPreDistributedJobStore().addDistributedJobDescriptor(jobId, acg, acggf.getJobSpecification(),
- acgg.getConstraints());
-
- ccServiceCtx.notifyJobCreation(jobId, acggf.getJobSpecification());
-
- byte[] acgBytes = JavaSerializationUtils.serialize(acg);
-
- INodeManager nodeManager = ccs.getNodeManager();
- for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
- node.getNodeController().distributeJob(jobId, acgBytes);
- }
-
- callback.setValue(jobId);
- } catch (Exception e) {
- callback.setException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
deleted file mode 100644
index f7fa2a4..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.work;
-
-import org.apache.hyracks.api.exceptions.ErrorCode;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-
-public class DistributedJobFailureWork extends SynchronizableWork {
- protected final JobId jobId;
- protected final String nodeId;
-
- public DistributedJobFailureWork(JobId jobId, String nodeId) {
- this.jobId = jobId;
- this.nodeId = nodeId;
- }
-
- @Override
- public void doRun() throws HyracksException {
- throw HyracksException.create(ErrorCode.DISTRIBUTED_JOB_FAILURE, jobId, nodeId);
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index ed82705..cfedfc9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -18,9 +18,11 @@
*/
package org.apache.hyracks.control.cc.work;
-import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.JobFlag;
@@ -37,20 +39,23 @@ import org.apache.hyracks.control.common.work.SynchronizableWork;
public class JobStartWork extends SynchronizableWork {
private final ClusterControllerService ccs;
private final byte[] acggfBytes;
- private final EnumSet<JobFlag> jobFlags;
+ private final Set<JobFlag> jobFlags;
private final DeploymentId deploymentId;
- private final JobId preDistributedJobId;
private final IResultCallback<JobId> callback;
private final JobIdFactory jobIdFactory;
+ private final DeployedJobSpecId deployedJobSpecId;
+ private final Map<byte[], byte[]> jobParameters;
public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes,
- EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, JobIdFactory jobIdFactory) {
+ Set<JobFlag> jobFlags, JobIdFactory jobIdFactory, Map<byte[], byte[]> jobParameters,
+ IResultCallback<JobId> callback, DeployedJobSpecId deployedJobSpecId) {
this.deploymentId = deploymentId;
- this.preDistributedJobId = jobId;
this.ccs = ccs;
this.acggfBytes = acggfBytes;
this.jobFlags = jobFlags;
this.callback = callback;
+ this.deployedJobSpecId = deployedJobSpecId;
+ this.jobParameters = jobParameters;
this.jobIdFactory = jobIdFactory;
}
@@ -61,19 +66,18 @@ public class JobStartWork extends SynchronizableWork {
final CCServiceContext ccServiceCtx = ccs.getContext();
JobId jobId;
JobRun run;
- if (preDistributedJobId == null) {
- jobId = jobIdFactory.create();
+ jobId = jobIdFactory.create();
+ if (deployedJobSpecId == null) {
//Need to create the ActivityClusterGraph
IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
.deserialize(acggfBytes, deploymentId, ccServiceCtx);
- IActivityClusterGraphGenerator acgg =
- acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, jobFlags);
+ IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(ccServiceCtx, jobFlags);
run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
} else {
- jobId = preDistributedJobId;
//ActivityClusterGraph has already been distributed
run = new JobRun(ccs, deploymentId, jobId, jobFlags,
- ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
+ ccs.getDeployedJobSpecStore().getDeployedJobSpecDescriptor(deployedJobSpecId), jobParameters,
+ deployedJobSpecId);
}
jobManager.add(run);
callback.setValue(jobId);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
new file mode 100644
index 0000000..143c8c1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class UndeployJobSpecWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+ private final DeployedJobSpecId deployedJobSpecId;
+ private final IResultCallback<DeployedJobSpecId> callback;
+
+ public UndeployJobSpecWork(ClusterControllerService ccs, DeployedJobSpecId deployedJobSpecId,
+ IResultCallback<DeployedJobSpecId> callback) {
+ this.deployedJobSpecId = deployedJobSpecId;
+ this.ccs = ccs;
+ this.callback = callback;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ ccs.getDeployedJobSpecStore().removeDeployedJobSpecDescriptor(deployedJobSpecId);
+ INodeManager nodeManager = ccs.getNodeManager();
+ for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
+ node.getNodeController().undeployJobSpec(deployedJobSpecId);
+ }
+ callback.setValue(deployedJobSpecId);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index ec8e045..6fd321e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.deployment.DeploymentStatus;
@@ -44,7 +45,7 @@ public interface IClusterController {
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
throws Exception;
- public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception;
+ public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception;
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index a10f8f0..5d781cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.control.common.base;
import java.net.URL;
-import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,6 +28,7 @@ import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
@@ -38,7 +38,8 @@ import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
public interface INodeController {
public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
- Set<JobFlag> flags) throws Exception;
+ Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
+ throws Exception;
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
@@ -50,9 +51,9 @@ public interface INodeController {
public void undeployBinary(DeploymentId deploymentId) throws Exception;
- public void distributeJob(JobId jobId, byte[] planBytes) throws Exception;
+ public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception;
- public void destroyJob(JobId jobId) throws Exception;
+ public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
public void dumpState(String stateDumpId) throws Exception;