You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org> on 2017/05/16 21:32:30 UTC

Change in asterixdb[master]: Build Test do not merge

Steven Jacobs has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1752

Change subject: Build Test do not merge
......................................................................

Build Test do not merge

Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
---
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
M asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
28 files changed, 288 insertions(+), 55 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/52/1752/1

diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index b4ed8e5..5a1437e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -54,13 +54,13 @@
                 if (entityId != null) {
                     IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
                     LOGGER.log(Level.FINER, "Next event is of type " + event.getEventKind());
-                    if (event.getEventKind() == Kind.JOB_FINISHED) {
-                        LOGGER.log(Level.FINER, "Removing the job");
-                        jobId2ActiveJobInfos.remove(event.getJobId());
-                    }
                     if (listener != null) {
                         LOGGER.log(Level.FINER, "Notifying the listener");
                         listener.notify(event);
+                    } else if (event.getEventKind() == Kind.JOB_FINISHED) {
+                        LOGGER.log(Level.FINER, "Removing the job");
+                        jobId2ActiveJobInfos.remove(event.getJobId());
+
                     }
 
                 } else {
@@ -75,9 +75,11 @@
         LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
     }
 
-    public synchronized void removeListener(IActiveEntityEventsListener listener) throws HyracksDataException {
+    public synchronized void removeListener(IActiveEntityEventsListener listener, JobId jobId) throws HyracksDataException {
         LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore");
         unregisterListener(listener);
+        LOGGER.log(Level.FINER, "Removing the job");
+        jobId2ActiveJobInfos.remove(jobId);
     }
 
     public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
index 6f24fc5..345d670 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
@@ -131,6 +131,8 @@
 import org.apache.asterix.runtime.evaluators.constructors.AYearMonthDurationConstructorDescriptor;
 import org.apache.asterix.runtime.evaluators.constructors.ClosedRecordConstructorDescriptor;
 import org.apache.asterix.runtime.evaluators.constructors.OpenRecordConstructorDescriptor;
+import org.apache.asterix.runtime.evaluators.constructors.OrderedListConstructorDescriptor;
+import org.apache.asterix.runtime.evaluators.constructors.UnorderedListConstructorDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.AnyCollectionMemberDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor;
@@ -155,6 +157,7 @@
 import org.apache.asterix.runtime.evaluators.functions.FullTextContainsDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.FullTextContainsWithoutOptionDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.GetItemDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.GetJobParameterDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.GramTokensDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.HashedGramTokensDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.HashedWordTokensDescriptor;
@@ -197,7 +200,6 @@
 import org.apache.asterix.runtime.evaluators.functions.NumericTruncDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.NumericUnaryMinusDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.OrDescriptor;
-import org.apache.asterix.runtime.evaluators.constructors.OrderedListConstructorDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.PrefixLenJaccardDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardCheckDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardDescriptor;
@@ -245,7 +247,6 @@
 import org.apache.asterix.runtime.evaluators.functions.SubstringDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SwitchCaseDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.UUIDDescriptor;
-import org.apache.asterix.runtime.evaluators.constructors.UnorderedListConstructorDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.WordTokensDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.binary.BinaryConcatDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.binary.BinaryLengthDescriptor;
@@ -419,6 +420,9 @@
         // Inject failure function
         temp.add(InjectFailureDescriptor.FACTORY);
 
+        // Get Job Parameter function
+        temp.add(GetJobParameterDescriptor.FACTORY);
+
         // Switch case
         temp.add(SwitchCaseDescriptor.FACTORY);
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b7aae59..3cbfc9b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -762,7 +762,7 @@
 
     }
 
-    protected void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
+    public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         ProgressState progress = ProgressState.NO_PROGRESS;
         CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
@@ -1630,7 +1630,7 @@
         CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
         String dataverse = getActiveDataverseName(cfs.getSignature().getNamespace());
         cfs.getSignature().setNamespace(dataverse);
-        String functionName = cfs.getaAterixFunction().getName();
+        String functionName = cfs.getSignature().getName();
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1641,7 +1641,7 @@
             if (dv == null) {
                 throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
             }
-            Function function = new Function(dataverse, functionName, cfs.getaAterixFunction().getArity(),
+            Function function = new Function(dataverse, functionName, cfs.getSignature().getArity(),
                     cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(), Function.LANGUAGE_AQL,
                     FunctionKind.SCALAR.toString());
             MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 6f3b667..5172686 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -110,7 +110,7 @@
         JobStatus status = hcc.getJobStatus(jobId);
         state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED;
         ActiveLifecycleListener activeLcListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
-        activeLcListener.getNotificationHandler().removeListener(this);
+        activeLcListener.getNotificationHandler().removeListener(this, jobId);
     }
 
     private void start(ActiveEvent event) {
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
index 6d74957..2b7cad6 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
@@ -34,10 +34,6 @@
     private final boolean ifNotExists;
     private final List<String> paramList;
 
-    public FunctionSignature getaAterixFunction() {
-        return signature;
-    }
-
     public String getFunctionBody() {
         return functionBody;
     }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 0769596..ca75987 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -797,6 +797,9 @@
     public static final FunctionIdentifier EXTERNAL_LOOKUP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "external-lookup", FunctionIdentifier.VARARGS);
 
+    public static final FunctionIdentifier GET_JOB_PARAMETER =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "get-job-param", 1);
+
     public static final FunctionIdentifier META = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta",
             FunctionIdentifier.VARARGS);
     public static final FunctionIdentifier META_KEY = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta-key",
@@ -1172,6 +1175,9 @@
         // external lookup
         addPrivateFunction(EXTERNAL_LOOKUP, AnyTypeComputer.INSTANCE, false);
 
+        // get job parameter
+        addFunction(GET_JOB_PARAMETER, AnyTypeComputer.INSTANCE, false);
+
         // unnesting function
         addPrivateFunction(SCAN_COLLECTION, CollectionMemberResultType.INSTANCE, true);
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
index 1f9909c..4a421cc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
@@ -22,8 +22,8 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -48,7 +48,7 @@
     final GrowableArray resultArray = new GrowableArray();
     final UTF8StringBuilder resultBuilder = new UTF8StringBuilder();
     private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
-    private final DataOutput dataOutput = resultStorage.getDataOutput();
+    protected final DataOutput dataOutput = resultStorage.getDataOutput();
     private final FunctionIdentifier funcID;
 
     AbstractUnaryStringStringEval(IHyracksTaskContext context, IScalarEvaluatorFactory argEvalFactory,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
new file mode 100644
index 0000000..9d73dcc
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.evaluators.functions;
+
+import java.io.IOException;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+
+public class GetJobParameterDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new GetJobParameterDescriptor();
+        }
+    };
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws HyracksDataException {
+                return new AbstractUnaryStringStringEval(ctx, args[0],
+                        GetJobParameterDescriptor.this.getIdentifier()) {
+                    private byte[] result;
+
+                    @Override
+                    protected void process(UTF8StringPointable inputString, IPointable resultPointable)
+                            throws IOException {
+                        result = ctx.getJobParameter(inputString.getByteArray(), inputString.getStartOffset(),
+                                inputString.getLength());
+                    }
+
+                    @Override
+                    void writeResult(IPointable resultPointable) throws IOException {
+                        resultPointable.set(result, 0, result.length);
+                    }
+                };
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.GET_JOB_PARAMETER;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index aa9232e..fecce67 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -20,8 +20,9 @@
 
 import java.io.Serializable;
 import java.net.URL;
-import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
 import org.apache.hyracks.api.dataset.ResultSetId;
@@ -165,32 +166,39 @@
         private static final long serialVersionUID = 1L;
 
         private final byte[] acggfBytes;
-        private final EnumSet<JobFlag> jobFlags;
+        private final Set<JobFlag> jobFlags;
         private final DeploymentId deploymentId;
         private final JobId jobId;
+        private final Map<byte[], byte[]> jobParameters;
 
-        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) {
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, Set<JobFlag> jobFlags, JobId jobId,
+                Map<byte[], byte[]> jobParameters) {
             this.acggfBytes = acggfBytes;
             this.jobFlags = jobFlags;
             this.deploymentId = deploymentId;
             this.jobId = jobId;
+            this.jobParameters = jobParameters;
         }
 
-        public StartJobFunction(JobId jobId) {
-            this(null, null, null, jobId);
+        public StartJobFunction(JobId jobId, Map<byte[], byte[]> jobParameters) {
+            this(null, null, null, jobId, jobParameters);
         }
 
-        public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
-            this(null, acggfBytes, jobFlags, null);
+        public StartJobFunction(byte[] acggfBytes, Set<JobFlag> jobFlags) {
+            this(null, acggfBytes, jobFlags, null, null);
         }
 
-        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
-            this(deploymentId, acggfBytes, jobFlags, null);
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, Set<JobFlag> jobFlags) {
+            this(deploymentId, acggfBytes, jobFlags, null, null);
         }
 
         @Override
         public FunctionId getFunctionId() {
             return FunctionId.START_JOB;
+        }
+
+        public Map<byte[], byte[]> getJobParameters() {
+            return jobParameters;
         }
 
         public JobId getJobId() {
@@ -201,7 +209,7 @@
             return acggfBytes;
         }
 
-        public EnumSet<JobFlag> getJobFlags() {
+        public Set<JobFlag> getJobFlags() {
             return jobFlags;
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 0142c7d..5ff621d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -76,9 +76,9 @@
     }
 
     @Override
-    public JobId startJob(JobId jobId) throws Exception {
+    public JobId startJob(JobId jobId, Map<byte[], byte[]> jobParameters) throws Exception {
         HyracksClientInterfaceFunctions.StartJobFunction sjf =
-                new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
+                new HyracksClientInterfaceFunctions.StartJobFunction(jobId, jobParameters);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 4b3aff2..574c9a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -120,8 +120,12 @@
     }
 
     @Override
-    public JobId startJob(JobId jobId) throws Exception {
-        return hci.startJob(jobId);
+    public JobId startJob(JobId jobId, Map<byte[], byte[]> jobParameters) throws Exception {
+        JobStatus status = getJobStatus(jobId);
+        if (status != null && status.equals(JobStatus.RUNNING)) {
+            throw new HyracksException("Tried to run a pre-distributed job while already running");
+        }
+        return hci.startJob(jobId, jobParameters);
     }
 
     public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 0956d85..3b6b57a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -20,6 +20,7 @@
 
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
@@ -110,9 +111,11 @@
      *
      * @param jobId
      *            The id of the predistributed job
+     * @param jobParameters
+     *            The serialized job parameters
      * @throws Exception
      */
-    public JobId startJob(JobId jobId) throws Exception;
+    public JobId startJob(JobId jobId, Map<byte[], byte[]> jobParameters) throws Exception;
 
     /**
      * Start the specified Job.
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 1afbe9e..ec4026b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -38,7 +38,7 @@
 
     public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
 
-    public JobId startJob(JobId jobId) throws Exception;
+    public JobId startJob(JobId jobId, Map<byte[], byte[]> jobParameters) throws Exception;
 
     public void cancelJob(JobId jobId) throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index c8e4cf8..612fa36 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -48,4 +48,6 @@
     void setSharedObject(Object object);
 
     Object getSharedObject();
+
+    public byte[] getJobParameter(byte[] name, int start, int length);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
index b64e2d5..647522c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
@@ -24,13 +24,14 @@
 import java.util.List;
 import java.util.Map;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class ActivityClusterGraph implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -42,6 +43,8 @@
     private final Map<ActivityId, ActivityCluster> activityMap;
 
     private final Map<ConnectorDescriptorId, ActivityCluster> connectorMap;
+
+    private JobParameterByteStore jobParameterByteStore;
 
     private int frameSize;
 
@@ -62,6 +65,7 @@
         activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
         activityMap = new HashMap<ActivityId, ActivityCluster>();
         connectorMap = new HashMap<ConnectorDescriptorId, ActivityCluster>();
+        jobParameterByteStore = new JobParameterByteStore();
         frameSize = 32768;
         reportTaskDetails = true;
     }
@@ -74,6 +78,10 @@
         return connectorMap;
     }
 
+    public JobParameterByteStore getJobParameterByteStore() {
+        return jobParameterByteStore;
+    }
+
     public Map<ActivityClusterId, ActivityCluster> getActivityClusterMap() {
         return activityClusterMap;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
new file mode 100644
index 0000000..e2e324b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class JobParameterByteStore implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private Map<byte[], byte[]> vars;
+    private final byte[] empty = new byte[0];
+
+    public JobParameterByteStore() {
+        vars = new HashMap<>();
+    }
+
+    public Map<byte[], byte[]> getParameterMap() {
+        return vars;
+    }
+
+    public void setParameters(Map<byte[], byte[]> map) {
+        vars = map;
+    }
+
+    public byte[] getParameterValue(byte[] name, int start, int length) {
+        for (Entry<byte[], byte[]> entry : vars.entrySet()) {
+            byte[] key = entry.getKey();
+            if (key.length == length) {
+                boolean matched = true;
+                for (int j = 0; j < length; j++) {
+                    if (key[j] != name[j + start]) {
+                        matched = false;
+                        break;
+                    }
+                }
+                if (matched) {
+                    return entry.getValue();
+                }
+            }
+        }
+        return empty;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index ced3d67..a32d5f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -116,7 +116,7 @@
                     predistributed = true;
                 }
                 ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
-                        jobId, new IPCResponder<JobId>(handle, mid), predistributed));
+                        jobId, sjf.getJobParameters(), new IPCResponder<JobId>(handle, mid), predistributed));
                 break;
             case GET_DATASET_DIRECTORY_SERIVICE_INFO:
                 ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs,
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 2150bdd..c10e472 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -513,7 +513,7 @@
                     }
                     byte[] jagBytes = changed ? acgBytes : null;
                     node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
-                            connectorPolicies, jobRun.getFlags());
+                            connectorPolicies, jobRun.getFlags(), acg.getJobParameterByteStore().getParameterMap());
                 }
             }
         } catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 55a7a82..917bc2a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -117,10 +117,11 @@
 
     //Run a Pre-distributed job by passing the JobId
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
-            PreDistributedJobDescriptor distributedJobDescriptor)
+            PreDistributedJobDescriptor distributedJobDescriptor, Map<byte[], byte[]> jobParameters)
             throws HyracksException {
         this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class),
                 distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
+        acg.getJobParameterByteStore().setParameters(jobParameters);
         Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
         this.scheduler = new JobExecutor(ccs, this, constaints, true);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index 2dbb631..083d961 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -18,7 +18,8 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
@@ -36,14 +37,16 @@
 public class JobStartWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
     private final byte[] acggfBytes;
-    private final EnumSet<JobFlag> jobFlags;
+    private final Set<JobFlag> jobFlags;
     private final DeploymentId deploymentId;
     private final JobId jobId;
     private final IResultCallback<JobId> callback;
     private final boolean predestributed;
+    private final Map<byte[], byte[]> jobParameters;
 
     public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes,
-            EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, boolean predestributed) {
+            Set<JobFlag> jobFlags, JobId jobId, Map<byte[], byte[]> jobParameters,
+            IResultCallback<JobId> callback, boolean predestributed) {
         this.deploymentId = deploymentId;
         this.jobId = jobId;
         this.ccs = ccs;
@@ -51,6 +54,7 @@
         this.jobFlags = jobFlags;
         this.callback = callback;
         this.predestributed = predestributed;
+        this.jobParameters = jobParameters;
     }
 
     @Override
@@ -69,7 +73,7 @@
             } else {
                 //ActivityClusterGraph has already been distributed
                 run = new JobRun(ccs, deploymentId, jobId,
-                        ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
+                        ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId), jobParameters);
             }
             jobManager.add(run);
             callback.setValue(jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index a10f8f0..9297f97 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.control.common.base;
 
 import java.net.URL;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,7 +37,7 @@
 public interface INodeController {
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            Set<JobFlag> flags) throws Exception;
+            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters) throws Exception;
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 4eb1732..cf1b1f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -753,16 +753,19 @@
         private final List<TaskAttemptDescriptor> taskDescriptors;
         private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
         private final Set<JobFlag> flags;
+        private final Map<byte[], byte[]> jobParameters;
 
         public StartTasksFunction(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
                 List<TaskAttemptDescriptor> taskDescriptors,
-                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag> flags) {
+                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag> flags,
+                Map<byte[], byte[]> jobParameters) {
             this.deploymentId = deploymentId;
             this.jobId = jobId;
             this.planBytes = planBytes;
             this.taskDescriptors = taskDescriptors;
             this.connectorPolicies = connectorPolicies;
             this.flags = flags;
+            this.jobParameters = jobParameters;
         }
 
         @Override
@@ -776,6 +779,10 @@
 
         public JobId getJobId() {
             return jobId;
+        }
+
+        public Map<byte[], byte[]> getJobParameters() {
+            return jobParameters;
         }
 
         public byte[] getPlanBytes() {
@@ -838,7 +845,27 @@
                 flags.add(JobFlag.values()[(dis.readInt())]);
             }
 
-            return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags);
+            // read job parameters
+            int runTimeVarsSize = dis.readInt();
+            Map<byte[], byte[]> jobParameters = new HashMap<>();
+            for (int i = 0; i < runTimeVarsSize; i++) {
+                int nameLength = dis.readInt();
+                byte[] nameBytes = null;
+                if (nameLength >= 0) {
+                    nameBytes = new byte[nameLength];
+                    dis.read(nameBytes, 0, nameLength);
+                }
+                int valueLength = dis.readInt();
+                byte[] valueBytes = null;
+                if (valueLength >= 0) {
+                    valueBytes = new byte[valueLength];
+                    dis.read(valueBytes, 0, valueLength);
+                }
+                jobParameters.put(nameBytes, valueBytes);
+            }
+
+            return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags,
+                    jobParameters);
         }
 
         public static void serialize(OutputStream out, Object object) throws Exception {
@@ -876,6 +903,16 @@
             for (JobFlag flag : fn.flags) {
                 dos.writeInt(flag.ordinal());
             }
+
+            //write job parameters
+            dos.writeInt(fn.jobParameters.size());
+            for (Entry<byte[], byte[]> entry : fn.jobParameters.entrySet()) {
+                dos.writeInt(entry.getKey().length);
+                dos.write(entry.getKey(), 0, entry.getKey().length);
+                dos.writeInt(entry.getValue().length);
+                dos.write(entry.getValue(), 0, entry.getValue().length);
+            }
+
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 2a8464e..0e9033a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -46,9 +46,9 @@
     @Override
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            Set<JobFlag> flags) throws Exception {
+            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters) throws Exception {
         CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
-                taskDescriptors, connectorPolicies, flags);
+                taskDescriptors, connectorPolicies, flags, jobParameters);
         ipcHandle.send(-1, stf, null);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index c416942..7defbda 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -62,8 +62,10 @@
                 return;
             case START_TASKS:
                 CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
-                ncs.getWorkQueue().schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(),
-                        stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+                ncs.getWorkQueue()
+                        .schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(), stf.getPlanBytes(),
+                                stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags(),
+                                stf.getJobParameters()));
                 return;
             case ABORT_TASKS:
                 CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 7224b49..8f41357 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -411,4 +411,9 @@
     public Object getSharedObject() {
         return sharedObject;
     }
+
+    @Override
+    public byte[] getJobParameter(byte[] name, int start, int length) {
+        return joblet.getActivityClusterGraph().getJobParameterByteStore().getParameterValue(name, start, length);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 02e8051..e6d5f7e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -86,9 +86,12 @@
 
     private final Set<JobFlag> flags;
 
+    private final Map<byte[], byte[]> jobParameters;
+
     public StartTasksWork(NodeControllerService ncs, DeploymentId deploymentId, JobId jobId, byte[] acgBytes,
             List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, Set<JobFlag> flags) {
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, Set<JobFlag> flags,
+            Map<byte[], byte[]> jobParameters) {
         this.ncs = ncs;
         this.deploymentId = deploymentId;
         this.jobId = jobId;
@@ -96,6 +99,7 @@
         this.taskDescriptors = taskDescriptors;
         this.connectorPoliciesMap = connectorPoliciesMap;
         this.flags = flags;
+        this.jobParameters = jobParameters;
     }
 
     @Override
@@ -194,6 +198,7 @@
                 }
                 acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
             }
+            acg.getJobParameterByteStore().setParameters(jobParameters);
             ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
             jobletMap.put(jobId, ji);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
index 4a01fdb..31fcc1f 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -23,6 +23,7 @@
 import static org.mockito.Mockito.verify;
 
 import java.io.File;
+import java.util.HashMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -126,7 +127,7 @@
         Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId2) != null);
 
         //run the first job
-        hcc.startJob(jobId1);
+        hcc.startJob(jobId1, new HashMap<>());
         hcc.waitForCompletion(jobId1);
 
         //destroy the first job
@@ -142,7 +143,7 @@
         cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId1);
 
         //run the second job
-        hcc.startJob(jobId2);
+        hcc.startJob(jobId2, new HashMap<>());
         hcc.waitForCompletion(jobId2);
 
         //wait ten seconds to ensure the result sweeper does not break the job
@@ -150,7 +151,7 @@
         Thread.sleep(10000);
 
         //run the second job again
-        hcc.startJob(jobId2);
+        hcc.startJob(jobId2, new HashMap<>());
         hcc.waitForCompletion(jobId2);
 
         //destroy the second job
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 2171122..e1192c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -155,4 +155,9 @@
     public Object getSharedObject() {
         return sharedObject;
     }
+
+    @Override
+    public byte[] getJobParameter(byte[] name, int start, int length) {
+        return new byte[0];
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>

Change in asterixdb[master]: Build Test do not merge

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Build Test do not merge
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/5296/ (4/6)

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Build Test do not merge

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Build Test do not merge
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-verify-storage/353/ (6/6)

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Build Test do not merge

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Build Test do not merge
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-verify-no-installer-app/430/ (2/6)

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Build Test do not merge

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Build Test do not merge
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-asterix-app/613/ (3/6)

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Build Test do not merge

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Steven Jacobs has abandoned this change.

Change subject: Build Test do not merge
......................................................................


Abandoned

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: abandon
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>

Change in asterixdb[master]: Build Test do not merge

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Build Test do not merge
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-verify-asterix-app/429/ (1/6)

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Build Test do not merge

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Build Test do not merge
......................................................................


Patch Set 1: Integration-Tests+1

Integration Tests Successful

https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/2809/ : SUCCESS

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Build Test do not merge

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Build Test do not merge
......................................................................


Patch Set 1:

WARNING: THIS CHANGE CONTAINS CROSS-PRODUCT CHANGES IN:
* asterixdb
* hyracks-fullstack

PLEASE REVIEW CAREFULLY AND LOOK FOR API CHANGES!

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Build Test do not merge

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Build Test do not merge
......................................................................


Patch Set 1:

Integration Tests Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/2809/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Build Test do not merge

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Build Test do not merge
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-sonar/3817/ (5/6)

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1752
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I7e970a0cd345eb34c79b026623b87a7407ed5207
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No