You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org> on 2017/05/16 21:32:30 UTC
Change in asterixdb[master]: Build Test do not merge
Steven Jacobs has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1752
Change subject: Build Test do not merge
......................................................................
Build Test do not merge
Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
---
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
M asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
28 files changed, 288 insertions(+), 55 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/52/1752/1
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index b4ed8e5..5a1437e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -54,13 +54,13 @@
if (entityId != null) {
IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
LOGGER.log(Level.FINER, "Next event is of type " + event.getEventKind());
- if (event.getEventKind() == Kind.JOB_FINISHED) {
- LOGGER.log(Level.FINER, "Removing the job");
- jobId2ActiveJobInfos.remove(event.getJobId());
- }
if (listener != null) {
LOGGER.log(Level.FINER, "Notifying the listener");
listener.notify(event);
+ } else if (event.getEventKind() == Kind.JOB_FINISHED) {
+ LOGGER.log(Level.FINER, "Removing the job");
+ jobId2ActiveJobInfos.remove(event.getJobId());
+
}
} else {
@@ -75,9 +75,11 @@
LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
}
- public synchronized void removeListener(IActiveEntityEventsListener listener) throws HyracksDataException {
+ public synchronized void removeListener(IActiveEntityEventsListener listener, JobId jobId) throws HyracksDataException {
LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore");
unregisterListener(listener);
+ LOGGER.log(Level.FINER, "Removing the job");
+ jobId2ActiveJobInfos.remove(jobId);
}
public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
index 6f24fc5..345d670 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
@@ -131,6 +131,8 @@
import org.apache.asterix.runtime.evaluators.constructors.AYearMonthDurationConstructorDescriptor;
import org.apache.asterix.runtime.evaluators.constructors.ClosedRecordConstructorDescriptor;
import org.apache.asterix.runtime.evaluators.constructors.OpenRecordConstructorDescriptor;
+import org.apache.asterix.runtime.evaluators.constructors.OrderedListConstructorDescriptor;
+import org.apache.asterix.runtime.evaluators.constructors.UnorderedListConstructorDescriptor;
import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
import org.apache.asterix.runtime.evaluators.functions.AnyCollectionMemberDescriptor;
import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor;
@@ -155,6 +157,7 @@
import org.apache.asterix.runtime.evaluators.functions.FullTextContainsDescriptor;
import org.apache.asterix.runtime.evaluators.functions.FullTextContainsWithoutOptionDescriptor;
import org.apache.asterix.runtime.evaluators.functions.GetItemDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.GetJobParameterDescriptor;
import org.apache.asterix.runtime.evaluators.functions.GramTokensDescriptor;
import org.apache.asterix.runtime.evaluators.functions.HashedGramTokensDescriptor;
import org.apache.asterix.runtime.evaluators.functions.HashedWordTokensDescriptor;
@@ -197,7 +200,6 @@
import org.apache.asterix.runtime.evaluators.functions.NumericTruncDescriptor;
import org.apache.asterix.runtime.evaluators.functions.NumericUnaryMinusDescriptor;
import org.apache.asterix.runtime.evaluators.functions.OrDescriptor;
-import org.apache.asterix.runtime.evaluators.constructors.OrderedListConstructorDescriptor;
import org.apache.asterix.runtime.evaluators.functions.PrefixLenJaccardDescriptor;
import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardCheckDescriptor;
import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardDescriptor;
@@ -245,7 +247,6 @@
import org.apache.asterix.runtime.evaluators.functions.SubstringDescriptor;
import org.apache.asterix.runtime.evaluators.functions.SwitchCaseDescriptor;
import org.apache.asterix.runtime.evaluators.functions.UUIDDescriptor;
-import org.apache.asterix.runtime.evaluators.constructors.UnorderedListConstructorDescriptor;
import org.apache.asterix.runtime.evaluators.functions.WordTokensDescriptor;
import org.apache.asterix.runtime.evaluators.functions.binary.BinaryConcatDescriptor;
import org.apache.asterix.runtime.evaluators.functions.binary.BinaryLengthDescriptor;
@@ -419,6 +420,9 @@
// Inject failure function
temp.add(InjectFailureDescriptor.FACTORY);
+ // Get Job Parameter function
+ temp.add(GetJobParameterDescriptor.FACTORY);
+
// Switch case
temp.add(SwitchCaseDescriptor.FACTORY);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b7aae59..3cbfc9b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -762,7 +762,7 @@
}
- protected void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
+ public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
ProgressState progress = ProgressState.NO_PROGRESS;
CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
@@ -1630,7 +1630,7 @@
CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
String dataverse = getActiveDataverseName(cfs.getSignature().getNamespace());
cfs.getSignature().setNamespace(dataverse);
- String functionName = cfs.getaAterixFunction().getName();
+ String functionName = cfs.getSignature().getName();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1641,7 +1641,7 @@
if (dv == null) {
throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
}
- Function function = new Function(dataverse, functionName, cfs.getaAterixFunction().getArity(),
+ Function function = new Function(dataverse, functionName, cfs.getSignature().getArity(),
cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(), Function.LANGUAGE_AQL,
FunctionKind.SCALAR.toString());
MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 6f3b667..5172686 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -110,7 +110,7 @@
JobStatus status = hcc.getJobStatus(jobId);
state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED;
ActiveLifecycleListener activeLcListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
- activeLcListener.getNotificationHandler().removeListener(this);
+ activeLcListener.getNotificationHandler().removeListener(this, jobId);
}
private void start(ActiveEvent event) {
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
index 6d74957..2b7cad6 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
@@ -34,10 +34,6 @@
private final boolean ifNotExists;
private final List<String> paramList;
- public FunctionSignature getaAterixFunction() {
- return signature;
- }
-
public String getFunctionBody() {
return functionBody;
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 0769596..ca75987 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -797,6 +797,9 @@
public static final FunctionIdentifier EXTERNAL_LOOKUP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"external-lookup", FunctionIdentifier.VARARGS);
+ public static final FunctionIdentifier GET_JOB_PARAMETER =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "get-job-param", 1);
+
public static final FunctionIdentifier META = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta",
FunctionIdentifier.VARARGS);
public static final FunctionIdentifier META_KEY = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta-key",
@@ -1172,6 +1175,9 @@
// external lookup
addPrivateFunction(EXTERNAL_LOOKUP, AnyTypeComputer.INSTANCE, false);
+ // get job parameter
+ addFunction(GET_JOB_PARAMETER, AnyTypeComputer.INSTANCE, false);
+
// unnesting function
addPrivateFunction(SCAN_COLLECTION, CollectionMemberResultType.INSTANCE, true);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
index 1f9909c..4a421cc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
@@ -22,8 +22,8 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -48,7 +48,7 @@
final GrowableArray resultArray = new GrowableArray();
final UTF8StringBuilder resultBuilder = new UTF8StringBuilder();
private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
- private final DataOutput dataOutput = resultStorage.getDataOutput();
+ protected final DataOutput dataOutput = resultStorage.getDataOutput();
private final FunctionIdentifier funcID;
AbstractUnaryStringStringEval(IHyracksTaskContext context, IScalarEvaluatorFactory argEvalFactory,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
new file mode 100644
index 0000000..9d73dcc
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.evaluators.functions;
+
+import java.io.IOException;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+
+public class GetJobParameterDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new GetJobParameterDescriptor();
+ }
+ };
+
+ @Override
+ public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+ return new IScalarEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new AbstractUnaryStringStringEval(ctx, args[0],
+ GetJobParameterDescriptor.this.getIdentifier()) {
+ private byte[] result;
+
+ @Override
+ protected void process(UTF8StringPointable inputString, IPointable resultPointable)
+ throws IOException {
+ result = ctx.getJobParameter(inputString.getByteArray(), inputString.getStartOffset(),
+ inputString.getLength());
+ }
+
+ @Override
+ void writeResult(IPointable resultPointable) throws IOException {
+ resultPointable.set(result, 0, result.length);
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.GET_JOB_PARAMETER;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index aa9232e..fecce67 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -20,8 +20,9 @@
import java.io.Serializable;
import java.net.URL;
-import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
import org.apache.hyracks.api.dataset.ResultSetId;
@@ -165,32 +166,39 @@
private static final long serialVersionUID = 1L;
private final byte[] acggfBytes;
- private final EnumSet<JobFlag> jobFlags;
+ private final Set<JobFlag> jobFlags;
private final DeploymentId deploymentId;
private final JobId jobId;
+ private final Map<byte[], byte[]> jobParameters;
- public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) {
+ public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, Set<JobFlag> jobFlags, JobId jobId,
+ Map<byte[], byte[]> jobParameters) {
this.acggfBytes = acggfBytes;
this.jobFlags = jobFlags;
this.deploymentId = deploymentId;
this.jobId = jobId;
+ this.jobParameters = jobParameters;
}
- public StartJobFunction(JobId jobId) {
- this(null, null, null, jobId);
+ public StartJobFunction(JobId jobId, Map<byte[], byte[]> jobParameters) {
+ this(null, null, null, jobId, jobParameters);
}
- public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
- this(null, acggfBytes, jobFlags, null);
+ public StartJobFunction(byte[] acggfBytes, Set<JobFlag> jobFlags) {
+ this(null, acggfBytes, jobFlags, null, null);
}
- public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
- this(deploymentId, acggfBytes, jobFlags, null);
+ public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, Set<JobFlag> jobFlags) {
+ this(deploymentId, acggfBytes, jobFlags, null, null);
}
@Override
public FunctionId getFunctionId() {
return FunctionId.START_JOB;
+ }
+
+ public Map<byte[], byte[]> getJobParameters() {
+ return jobParameters;
}
public JobId getJobId() {
@@ -201,7 +209,7 @@
return acggfBytes;
}
- public EnumSet<JobFlag> getJobFlags() {
+ public Set<JobFlag> getJobFlags() {
return jobFlags;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 0142c7d..5ff621d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -76,9 +76,9 @@
}
@Override
- public JobId startJob(JobId jobId) throws Exception {
+ public JobId startJob(JobId jobId, Map<byte[], byte[]> jobParameters) throws Exception {
HyracksClientInterfaceFunctions.StartJobFunction sjf =
- new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
+ new HyracksClientInterfaceFunctions.StartJobFunction(jobId, jobParameters);
return (JobId) rpci.call(ipcHandle, sjf);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 4b3aff2..574c9a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -120,8 +120,12 @@
}
@Override
- public JobId startJob(JobId jobId) throws Exception {
- return hci.startJob(jobId);
+ public JobId startJob(JobId jobId, Map<byte[], byte[]> jobParameters) throws Exception {
+ JobStatus status = getJobStatus(jobId);
+ if (status != null && status.equals(JobStatus.RUNNING)) {
+ throw new HyracksException("Tried to run a pre-distributed job while already running");
+ }
+ return hci.startJob(jobId, jobParameters);
}
public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 0956d85..3b6b57a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -20,6 +20,7 @@
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.deployment.DeploymentId;
@@ -110,9 +111,11 @@
*
* @param jobId
* The id of the predistributed job
+ * @param jobParameters
+ * The serialized job parameters
* @throws Exception
*/
- public JobId startJob(JobId jobId) throws Exception;
+ public JobId startJob(JobId jobId, Map<byte[], byte[]> jobParameters) throws Exception;
/**
* Start the specified Job.
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 1afbe9e..ec4026b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -38,7 +38,7 @@
public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
- public JobId startJob(JobId jobId) throws Exception;
+ public JobId startJob(JobId jobId, Map<byte[], byte[]> jobParameters) throws Exception;
public void cancelJob(JobId jobId) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index c8e4cf8..612fa36 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -48,4 +48,6 @@
void setSharedObject(Object object);
Object getSharedObject();
+
+ public byte[] getJobParameter(byte[] name, int start, int length);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
index b64e2d5..647522c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
@@ -24,13 +24,14 @@
import java.util.List;
import java.util.Map;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
public class ActivityClusterGraph implements Serializable {
private static final long serialVersionUID = 1L;
@@ -42,6 +43,8 @@
private final Map<ActivityId, ActivityCluster> activityMap;
private final Map<ConnectorDescriptorId, ActivityCluster> connectorMap;
+
+ private JobParameterByteStore jobParameterByteStore;
private int frameSize;
@@ -62,6 +65,7 @@
activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
activityMap = new HashMap<ActivityId, ActivityCluster>();
connectorMap = new HashMap<ConnectorDescriptorId, ActivityCluster>();
+ jobParameterByteStore = new JobParameterByteStore();
frameSize = 32768;
reportTaskDetails = true;
}
@@ -74,6 +78,10 @@
return connectorMap;
}
+ public JobParameterByteStore getJobParameterByteStore() {
+ return jobParameterByteStore;
+ }
+
public Map<ActivityClusterId, ActivityCluster> getActivityClusterMap() {
return activityClusterMap;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
new file mode 100644
index 0000000..e2e324b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class JobParameterByteStore implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private Map<byte[], byte[]> vars;
+ private final byte[] empty = new byte[0];
+
+ public JobParameterByteStore() {
+ vars = new HashMap<>();
+ }
+
+ public Map<byte[], byte[]> getParameterMap() {
+ return vars;
+ }
+
+ public void setParameters(Map<byte[], byte[]> map) {
+ vars = map;
+ }
+
+ public byte[] getParameterValue(byte[] name, int start, int length) {
+ for (Entry<byte[], byte[]> entry : vars.entrySet()) {
+ byte[] key = entry.getKey();
+ if (key.length == length) {
+ boolean matched = true;
+ for (int j = 0; j < length; j++) {
+ if (key[j] != name[j + start]) {
+ matched = false;
+ break;
+ }
+ }
+ if (matched) {
+ return entry.getValue();
+ }
+ }
+ }
+ return empty;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index ced3d67..a32d5f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -116,7 +116,7 @@
predistributed = true;
}
ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
- jobId, new IPCResponder<JobId>(handle, mid), predistributed));
+ jobId, sjf.getJobParameters(), new IPCResponder<JobId>(handle, mid), predistributed));
break;
case GET_DATASET_DIRECTORY_SERIVICE_INFO:
ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs,
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 2150bdd..c10e472 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -513,7 +513,7 @@
}
byte[] jagBytes = changed ? acgBytes : null;
node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
- connectorPolicies, jobRun.getFlags());
+ connectorPolicies, jobRun.getFlags(), acg.getJobParameterByteStore().getParameterMap());
}
}
} catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 55a7a82..917bc2a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -117,10 +117,11 @@
//Run a Pre-distributed job by passing the JobId
public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
- PreDistributedJobDescriptor distributedJobDescriptor)
+ PreDistributedJobDescriptor distributedJobDescriptor, Map<byte[], byte[]> jobParameters)
throws HyracksException {
this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class),
distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
+ acg.getJobParameterByteStore().setParameters(jobParameters);
Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
this.scheduler = new JobExecutor(ccs, this, constaints, true);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index 2dbb631..083d961 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -18,7 +18,8 @@
*/
package org.apache.hyracks.control.cc.work;
-import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
@@ -36,14 +37,16 @@
public class JobStartWork extends SynchronizableWork {
private final ClusterControllerService ccs;
private final byte[] acggfBytes;
- private final EnumSet<JobFlag> jobFlags;
+ private final Set<JobFlag> jobFlags;
private final DeploymentId deploymentId;
private final JobId jobId;
private final IResultCallback<JobId> callback;
private final boolean predestributed;
+ private final Map<byte[], byte[]> jobParameters;
public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes,
- EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, boolean predestributed) {
+ Set<JobFlag> jobFlags, JobId jobId, Map<byte[], byte[]> jobParameters,
+ IResultCallback<JobId> callback, boolean predestributed) {
this.deploymentId = deploymentId;
this.jobId = jobId;
this.ccs = ccs;
@@ -51,6 +54,7 @@
this.jobFlags = jobFlags;
this.callback = callback;
this.predestributed = predestributed;
+ this.jobParameters = jobParameters;
}
@Override
@@ -69,7 +73,7 @@
} else {
//ActivityClusterGraph has already been distributed
run = new JobRun(ccs, deploymentId, jobId,
- ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
+ ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId), jobParameters);
}
jobManager.add(run);
callback.setValue(jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index a10f8f0..9297f97 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.control.common.base;
import java.net.URL;
-import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,7 +37,7 @@
public interface INodeController {
public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
- Set<JobFlag> flags) throws Exception;
+ Set<JobFlag> flags, Map<byte[], byte[]> jobParameters) throws Exception;
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 4eb1732..cf1b1f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -753,16 +753,19 @@
private final List<TaskAttemptDescriptor> taskDescriptors;
private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
private final Set<JobFlag> flags;
+ private final Map<byte[], byte[]> jobParameters;
public StartTasksFunction(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag> flags) {
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag> flags,
+ Map<byte[], byte[]> jobParameters) {
this.deploymentId = deploymentId;
this.jobId = jobId;
this.planBytes = planBytes;
this.taskDescriptors = taskDescriptors;
this.connectorPolicies = connectorPolicies;
this.flags = flags;
+ this.jobParameters = jobParameters;
}
@Override
@@ -776,6 +779,10 @@
public JobId getJobId() {
return jobId;
+ }
+
+ public Map<byte[], byte[]> getJobParameters() {
+ return jobParameters;
}
public byte[] getPlanBytes() {
@@ -838,7 +845,27 @@
flags.add(JobFlag.values()[(dis.readInt())]);
}
- return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags);
+ // read job parameters
+ int runTimeVarsSize = dis.readInt();
+ Map<byte[], byte[]> jobParameters = new HashMap<>();
+ for (int i = 0; i < runTimeVarsSize; i++) {
+ int nameLength = dis.readInt();
+ byte[] nameBytes = null;
+ if (nameLength >= 0) {
+ nameBytes = new byte[nameLength];
+ dis.read(nameBytes, 0, nameLength);
+ }
+ int valueLength = dis.readInt();
+ byte[] valueBytes = null;
+ if (valueLength >= 0) {
+ valueBytes = new byte[valueLength];
+ dis.read(valueBytes, 0, valueLength);
+ }
+ jobParameters.put(nameBytes, valueBytes);
+ }
+
+ return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags,
+ jobParameters);
}
public static void serialize(OutputStream out, Object object) throws Exception {
@@ -876,6 +903,16 @@
for (JobFlag flag : fn.flags) {
dos.writeInt(flag.ordinal());
}
+
+ //write job parameters
+ dos.writeInt(fn.jobParameters.size());
+ for (Entry<byte[], byte[]> entry : fn.jobParameters.entrySet()) {
+ dos.writeInt(entry.getKey().length);
+ dos.write(entry.getKey(), 0, entry.getKey().length);
+ dos.writeInt(entry.getValue().length);
+ dos.write(entry.getValue(), 0, entry.getValue().length);
+ }
+
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 2a8464e..0e9033a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -46,9 +46,9 @@
@Override
public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
- Set<JobFlag> flags) throws Exception {
+ Set<JobFlag> flags, Map<byte[], byte[]> jobParameters) throws Exception {
CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
- taskDescriptors, connectorPolicies, flags);
+ taskDescriptors, connectorPolicies, flags, jobParameters);
ipcHandle.send(-1, stf, null);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index c416942..7defbda 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -62,8 +62,10 @@
return;
case START_TASKS:
CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
- ncs.getWorkQueue().schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(),
- stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+ ncs.getWorkQueue()
+ .schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(), stf.getPlanBytes(),
+ stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags(),
+ stf.getJobParameters()));
return;
case ABORT_TASKS:
CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 7224b49..8f41357 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -411,4 +411,9 @@
public Object getSharedObject() {
return sharedObject;
}
+
+ @Override
+ public byte[] getJobParameter(byte[] name, int start, int length) {
+ return joblet.getActivityClusterGraph().getJobParameterByteStore().getParameterValue(name, start, length);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 02e8051..e6d5f7e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -86,9 +86,12 @@
private final Set<JobFlag> flags;
+ private final Map<byte[], byte[]> jobParameters;
+
public StartTasksWork(NodeControllerService ncs, DeploymentId deploymentId, JobId jobId, byte[] acgBytes,
List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, Set<JobFlag> flags) {
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, Set<JobFlag> flags,
+ Map<byte[], byte[]> jobParameters) {
this.ncs = ncs;
this.deploymentId = deploymentId;
this.jobId = jobId;
@@ -96,6 +99,7 @@
this.taskDescriptors = taskDescriptors;
this.connectorPoliciesMap = connectorPoliciesMap;
this.flags = flags;
+ this.jobParameters = jobParameters;
}
@Override
@@ -194,6 +198,7 @@
}
acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
}
+ acg.getJobParameterByteStore().setParameters(jobParameters);
ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
jobletMap.put(jobId, ji);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
index 4a01fdb..31fcc1f 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -23,6 +23,7 @@
import static org.mockito.Mockito.verify;
import java.io.File;
+import java.util.HashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -126,7 +127,7 @@
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId2) != null);
//run the first job
- hcc.startJob(jobId1);
+ hcc.startJob(jobId1, new HashMap<>());
hcc.waitForCompletion(jobId1);
//destroy the first job
@@ -142,7 +143,7 @@
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId1);
//run the second job
- hcc.startJob(jobId2);
+ hcc.startJob(jobId2, new HashMap<>());
hcc.waitForCompletion(jobId2);
//wait ten seconds to ensure the result sweeper does not break the job
@@ -150,7 +151,7 @@
Thread.sleep(10000);
//run the second job again
- hcc.startJob(jobId2);
+ hcc.startJob(jobId2, new HashMap<>());
hcc.waitForCompletion(jobId2);
//destroy the second job
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 2171122..e1192c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -155,4 +155,9 @@
public Object getSharedObject() {
return sharedObject;
}
+
+ @Override
+ public byte[] getJobParameter(byte[] name, int start, int length) {
+ return new byte[0];
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Change in asterixdb[master]: Build Test do not merge
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Build Test do not merge
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/5296/ (4/6)
--
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No
Change in asterixdb[master]: Build Test do not merge
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Build Test do not merge
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-verify-storage/353/ (6/6)
--
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No
Change in asterixdb[master]: Build Test do not merge
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Build Test do not merge
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-verify-no-installer-app/430/ (2/6)
--
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No
Change in asterixdb[master]: Build Test do not merge
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Build Test do not merge
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-asterix-app/613/ (3/6)
--
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No
Change in asterixdb[master]: Build Test do not merge
Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Steven Jacobs has abandoned this change.
Change subject: Build Test do not merge
......................................................................
Abandoned
--
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: abandon
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Change in asterixdb[master]: Build Test do not merge
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Build Test do not merge
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-verify-asterix-app/429/ (1/6)
--
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No
Change in asterixdb[master]: Build Test do not merge
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Build Test do not merge
......................................................................
Patch Set 1: Integration-Tests+1
Integration Tests Successful
https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/2809/ : SUCCESS
--
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No
Change in asterixdb[master]: Build Test do not merge
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Build Test do not merge
......................................................................
Patch Set 1:
WARNING: THIS CHANGE CONTAINS CROSS-PRODUCT CHANGES IN:
* asterixdb
* hyracks-fullstack
PLEASE REVIEW CAREFULLY AND LOOK FOR API CHANGES!
--
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No
Change in asterixdb[master]: Build Test do not merge
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Build Test do not merge
......................................................................
Patch Set 1:
Integration Tests Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/2809/
--
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No
Change in asterixdb[master]: Build Test do not merge
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Build Test do not merge
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-sonar/3817/ (5/6)
--
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No