You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/03/27 23:48:27 UTC
[04/12] asterixdb git commit: [NO ISSUE][RT] Add job start timestamp
to the joblet context
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
index 1ddb7e8..2879576 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
@@ -23,14 +23,14 @@ import java.io.IOException;
import org.apache.asterix.dataflow.data.nontagged.serde.ADateSerializerDeserializer;
import org.apache.asterix.om.base.temporal.DateTimeFormatUtils;
-import org.apache.asterix.om.base.temporal.GregorianCalendarSystem;
import org.apache.asterix.om.base.temporal.DateTimeFormatUtils.DateTimeParseMode;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
+import org.apache.asterix.om.base.temporal.GregorianCalendarSystem;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -45,8 +45,6 @@ import org.apache.hyracks.util.string.UTF8StringWriter;
public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- public final static FunctionIdentifier FID = BuiltinFunctions.PRINT_DATE;
- private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -76,6 +74,8 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
private StringBuilder sbder = new StringBuilder();
private final UTF8StringWriter utf8Writer = new UTF8StringWriter();
+ private final DateTimeFormatUtils util = DateTimeFormatUtils.getInstance();
+
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
resultStorage.reset();
@@ -101,11 +101,11 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
int formatLength = UTF8StringUtil.getUTFLength(bytes1, offset1 + 1);
int offset = UTF8StringUtil.getNumBytesToStoreLength(formatLength);
sbder.delete(0, sbder.length());
- DT_UTILS.printDateTime(chronon, 0, bytes1, offset1 + 1 + offset, formatLength, sbder,
+ util.printDateTime(chronon, 0, bytes1, offset1 + 1 + offset, formatLength, sbder,
DateTimeParseMode.DATE_ONLY);
out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
- utf8Writer.writeUTF8(sbder.toString(), out);
+ utf8Writer.writeUTF8(sbder, out);
} catch (IOException ex) {
throw new HyracksDataException(ex);
}
@@ -113,7 +113,6 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
}
};
}
-
};
}
@@ -122,7 +121,6 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
*/
@Override
public FunctionIdentifier getIdentifier() {
- return FID;
+ return BuiltinFunctions.PRINT_DATE;
}
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
index b9f6e50..6f22bf1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
@@ -24,12 +24,12 @@ import java.io.IOException;
import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
import org.apache.asterix.om.base.temporal.DateTimeFormatUtils;
import org.apache.asterix.om.base.temporal.DateTimeFormatUtils.DateTimeParseMode;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -44,8 +44,6 @@ import org.apache.hyracks.util.string.UTF8StringWriter;
public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- public final static FunctionIdentifier FID = BuiltinFunctions.PRINT_DATETIME;
- private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -75,6 +73,8 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
private UTF8StringWriter utf8Writer = new UTF8StringWriter();
private UTF8StringPointable utf8Ptr = new UTF8StringPointable();
+ private final DateTimeFormatUtils util = DateTimeFormatUtils.getInstance();
+
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
resultStorage.reset();
@@ -100,11 +100,11 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
utf8Ptr.set(bytes1, offset1 + 1, len1 - 1);
int formatLength = utf8Ptr.getUTF8Length();
sbder.delete(0, sbder.length());
- DT_UTILS.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(),
+ util.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(),
formatLength, sbder, DateTimeParseMode.DATETIME);
out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
- utf8Writer.writeUTF8(sbder.toString(), out);
+ utf8Writer.writeUTF8(sbder, out);
} catch (IOException ex) {
throw new HyracksDataException(ex);
}
@@ -112,7 +112,6 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
}
};
}
-
};
}
@@ -121,7 +120,6 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
*/
@Override
public FunctionIdentifier getIdentifier() {
- return FID;
+ return BuiltinFunctions.PRINT_DATETIME;
}
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
index 51c9dcd..0e7004f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
@@ -24,12 +24,12 @@ import java.io.IOException;
import org.apache.asterix.dataflow.data.nontagged.serde.ATimeSerializerDeserializer;
import org.apache.asterix.om.base.temporal.DateTimeFormatUtils;
import org.apache.asterix.om.base.temporal.DateTimeFormatUtils.DateTimeParseMode;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -44,8 +44,6 @@ import org.apache.hyracks.util.string.UTF8StringWriter;
public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- public final static FunctionIdentifier FID = BuiltinFunctions.PRINT_TIME;
- private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -74,6 +72,8 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
private final UTF8StringWriter writer = new UTF8StringWriter();
private final UTF8StringPointable utf8Ptr = new UTF8StringPointable();
+ private final DateTimeFormatUtils util = DateTimeFormatUtils.getInstance();
+
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
resultStorage.reset();
@@ -100,11 +100,11 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
utf8Ptr.set(bytes1, offset1 + 1, len1 - 1);
int formatLength = utf8Ptr.getUTF8Length();
sbder.delete(0, sbder.length());
- DT_UTILS.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(),
+ util.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(),
formatLength, sbder, DateTimeParseMode.TIME_ONLY);
out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
- writer.writeUTF8(sbder.toString(), out);
+ writer.writeUTF8(sbder, out);
} catch (IOException ex) {
throw new HyracksDataException(ex);
}
@@ -112,7 +112,6 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
}
};
}
-
};
}
@@ -121,7 +120,6 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
*/
@Override
public FunctionIdentifier getIdentifier() {
- return FID;
+ return BuiltinFunctions.PRINT_TIME;
}
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 1100335..7a59926 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -31,6 +31,8 @@ public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocat
JobId getJobId();
+ long getJobStartTime();
+
ICounterContext getCounterContext();
Object getGlobalJobData();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index ac06344..5245571 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -518,7 +518,8 @@ public class JobExecutor {
byte[] jagBytes = changed ? acgBytes : null;
node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
connectorPolicies, jobRun.getFlags(),
- ccs.createOrGetJobParameterByteStore(jobId).getParameterMap(), deployedJobSpecId);
+ ccs.createOrGetJobParameterByteStore(jobId).getParameterMap(), deployedJobSpecId,
+ jobRun.getStartTime());
}
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 9ec55f4..92764a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -38,7 +38,7 @@ import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
public interface INodeController {
void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
- Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
+ Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId, long startTime)
throws Exception;
void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index c10c8981..8e02936 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -764,11 +764,12 @@ public class CCNCFunctions {
private final Set<JobFlag> flags;
private final Map<byte[], byte[]> jobParameters;
private final DeployedJobSpecId deployedJobSpecId;
+ private final long jobStartTime;
public StartTasksFunction(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag> flags,
- Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) {
+ Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId, long jobStartTime) {
this.deploymentId = deploymentId;
this.jobId = jobId;
this.planBytes = planBytes;
@@ -777,6 +778,7 @@ public class CCNCFunctions {
this.flags = flags;
this.jobParameters = jobParameters;
this.deployedJobSpecId = deployedJobSpecId;
+ this.jobStartTime = jobStartTime;
}
@Override
@@ -816,6 +818,10 @@ public class CCNCFunctions {
return flags;
}
+ public long getJobStartTime() {
+ return jobStartTime;
+ }
+
public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
DataInputStream dis = new DataInputStream(bais);
@@ -885,8 +891,10 @@ public class CCNCFunctions {
deployedJobSpecId = DeployedJobSpecId.create(dis);
}
+ long jobStartTime = dis.readLong();
+
return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags,
- jobParameters, deployedJobSpecId);
+ jobParameters, deployedJobSpecId, jobStartTime);
}
public static void serialize(OutputStream out, Object object) throws Exception {
@@ -935,11 +943,13 @@ public class CCNCFunctions {
}
//write deployed job spec id
- dos.writeBoolean(fn.getDeployedJobSpecId() == null ? false : true);
+ dos.writeBoolean(fn.getDeployedJobSpecId() != null);
if (fn.getDeployedJobSpecId() != null) {
fn.getDeployedJobSpecId().writeFields(dos);
}
+ //write job start time
+ dos.writeLong(fn.jobStartTime);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 429cb26..b78e53f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -63,10 +63,10 @@ public class NodeControllerRemoteProxy implements INodeController {
@Override
public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
- Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
- throws Exception {
+ Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId,
+ long jobStartTime) throws Exception {
StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors,
- connectorPolicies, flags, jobParameters, deployedJobSpecId);
+ connectorPolicies, flags, jobParameters, deployedJobSpecId, jobStartTime);
ipcHandle.send(-1, stf, null);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 8790434..55bc192 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -103,9 +103,11 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
private final IJobletEventListenerFactory jobletEventListenerFactory;
+ private final long jobStartTime;
+
public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId,
INCServiceContext serviceCtx, ActivityClusterGraph acg,
- IJobletEventListenerFactory jobletEventListenerFactory) {
+ IJobletEventListenerFactory jobletEventListenerFactory, long jobStartTime) {
this.nodeController = nodeController;
this.serviceCtx = serviceCtx;
this.deploymentId = deploymentId;
@@ -131,6 +133,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
}
IGlobalJobDataFactory gjdf = acg.getGlobalJobDataFactory();
globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
+ this.jobStartTime = jobStartTime;
}
@Override
@@ -151,6 +154,11 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
return env;
}
+ @Override
+ public long getJobStartTime() {
+ return jobStartTime;
+ }
+
public void addTask(Task task) {
taskMap.put(task.getTaskAttemptId(), task);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index f55e250..735f7cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -65,7 +65,7 @@ final class NodeControllerIPCI implements IIPCI {
ncs.getWorkQueue()
.schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(), stf.getPlanBytes(),
stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags(),
- stf.getJobParameters(), stf.getDeployedJobSpecId()));
+ stf.getJobParameters(), stf.getDeployedJobSpecId(), stf.getJobStartTime()));
return;
case ABORT_TASKS:
CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 6a5785a..660621e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -95,10 +95,12 @@ public class StartTasksWork extends AbstractWork {
private final Map<byte[], byte[]> jobParameters;
+ private final long jobStartTime;
+
public StartTasksWork(NodeControllerService ncs, DeploymentId deploymentId, JobId jobId, byte[] acgBytes,
List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, Set<JobFlag> flags,
- Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) {
+ Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId, long jobStartTime) {
this.ncs = ncs;
this.deploymentId = deploymentId;
this.jobId = jobId;
@@ -108,6 +110,7 @@ public class StartTasksWork extends AbstractWork {
this.connectorPoliciesMap = connectorPoliciesMap;
this.flags = flags;
this.jobParameters = jobParameters;
+ this.jobStartTime = jobStartTime;
}
@Override
@@ -212,7 +215,7 @@ public class StartTasksWork extends AbstractWork {
}
listenerFactory.updateListenerJobParameters(ncs.createOrGetJobParameterByteStore(jobId));
}
- ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg, listenerFactory);
+ ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg, listenerFactory, jobStartTime);
jobletMap.put(jobId, ji);
}
return ji;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index 1d38d3d..b022b15 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -39,6 +39,7 @@ public class TestJobletContext implements IHyracksJobletContext {
private final FrameManager frameManger;
private JobId jobId;
private WorkspaceFileFactory fileFactory;
+ private final long jobStartTime;
public TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
this.frameSize = frameSize;
@@ -46,6 +47,7 @@ public class TestJobletContext implements IHyracksJobletContext {
this.jobId = jobId;
fileFactory = new WorkspaceFileFactory(this, (IIOManager) getIOManager());
this.frameManger = new FrameManager(frameSize);
+ this.jobStartTime = System.currentTimeMillis();
}
ByteBuffer allocateFrame() throws HyracksDataException {
@@ -113,6 +115,11 @@ public class TestJobletContext implements IHyracksJobletContext {
}
@Override
+ public long getJobStartTime() {
+ return jobStartTime;
+ }
+
+ @Override
public Object getGlobalJobData() {
return null;
}