You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:45:55 UTC
[24/78] [abbrv] [partial] incubator-systemml git commit: Move files
to new package folder structure
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/lops/compile/Dag.java b/src/main/java/com/ibm/bi/dml/lops/compile/Dag.java
deleted file mode 100644
index e5b63f5..0000000
--- a/src/main/java/com/ibm/bi/dml/lops/compile/Dag.java
+++ /dev/null
@@ -1,4574 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.lops.compile;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map.Entry;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.api.DMLScript.RUNTIME_PLATFORM;
-import com.ibm.bi.dml.conf.ConfigurationManager;
-import com.ibm.bi.dml.conf.DMLConfig;
-import com.ibm.bi.dml.hops.AggBinaryOp;
-import com.ibm.bi.dml.hops.BinaryOp;
-import com.ibm.bi.dml.hops.Hop.FileFormatTypes;
-import com.ibm.bi.dml.hops.HopsException;
-import com.ibm.bi.dml.hops.OptimizerUtils;
-import com.ibm.bi.dml.lops.AppendM;
-import com.ibm.bi.dml.lops.BinaryM;
-import com.ibm.bi.dml.lops.CombineBinary;
-import com.ibm.bi.dml.lops.Data;
-import com.ibm.bi.dml.lops.PMMJ;
-import com.ibm.bi.dml.lops.ParameterizedBuiltin;
-import com.ibm.bi.dml.lops.SortKeys;
-import com.ibm.bi.dml.lops.Data.OperationTypes;
-import com.ibm.bi.dml.lops.FunctionCallCP;
-import com.ibm.bi.dml.lops.Lop;
-import com.ibm.bi.dml.lops.Lop.Type;
-import com.ibm.bi.dml.lops.LopProperties.ExecLocation;
-import com.ibm.bi.dml.lops.LopProperties.ExecType;
-import com.ibm.bi.dml.lops.LopsException;
-import com.ibm.bi.dml.lops.MapMult;
-import com.ibm.bi.dml.lops.OutputParameters;
-import com.ibm.bi.dml.lops.OutputParameters.Format;
-import com.ibm.bi.dml.lops.PickByCount;
-import com.ibm.bi.dml.lops.Unary;
-import com.ibm.bi.dml.parser.DataExpression;
-import com.ibm.bi.dml.parser.Expression;
-import com.ibm.bi.dml.parser.ParameterizedBuiltinFunctionExpression;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.StatementBlock;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.ProgramConverter;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.IDSequence;
-import com.ibm.bi.dml.runtime.instructions.CPInstructionParser;
-import com.ibm.bi.dml.runtime.instructions.Instruction;
-import com.ibm.bi.dml.runtime.instructions.Instruction.INSTRUCTION_TYPE;
-import com.ibm.bi.dml.runtime.instructions.InstructionParser;
-import com.ibm.bi.dml.runtime.instructions.SPInstructionParser;
-import com.ibm.bi.dml.runtime.instructions.cp.CPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.VariableCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.CPInstruction.CPINSTRUCTION_TYPE;
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.matrix.sort.PickFromCompactInputFormat;
-
-
-
-/**
- *
- * Class to maintain a DAG and compile it into jobs
- * @param <N>
- */
-public class Dag<N extends Lop>
-{
-
- private static final Log LOG = LogFactory.getLog(Dag.class.getName());
-
- private static final int CHILD_BREAKS_ALIGNMENT = 2;
- private static final int CHILD_DOES_NOT_BREAK_ALIGNMENT = 1;
- private static final int MRCHILD_NOT_FOUND = 0;
- private static final int MR_CHILD_FOUND_BREAKS_ALIGNMENT = 4;
- private static final int MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT = 5;
-
- private static IDSequence job_id = null;
- private static IDSequence var_index = null;
-
- private int total_reducers = -1;
- private String scratch = "";
- private String scratchFilePath = "";
-
- private double gmrMapperFootprint = 0;
-
- static {
- job_id = new IDSequence();
- var_index = new IDSequence();
- }
-
- // hash set for all nodes in dag
- private ArrayList<N> nodes = null;
-
- /*
- * Hashmap to translates the nodes in the DAG to a sequence of numbers
- * key: Lop ID
- * value: Sequence Number (0 ... |DAG|)
- *
- * This map is primarily used in performing DFS on the DAG, and subsequently in performing ancestor-descendant checks.
- */
- private HashMap<Long, Integer> IDMap = null;
-
-
- private class NodeOutput {
- String fileName;
- String varName;
- OutputInfo outInfo;
- ArrayList<Instruction> preInstructions; // instructions added before a MR instruction
- ArrayList<Instruction> postInstructions; // instructions added after a MR instruction
- ArrayList<Instruction> lastInstructions;
-
- NodeOutput() {
- fileName = null;
- varName = null;
- outInfo = null;
- preInstructions = new ArrayList<Instruction>();
- postInstructions = new ArrayList<Instruction>();
- lastInstructions = new ArrayList<Instruction>();
- }
-
- public String getFileName() {
- return fileName;
- }
- public void setFileName(String fileName) {
- this.fileName = fileName;
- }
- public String getVarName() {
- return varName;
- }
- public void setVarName(String varName) {
- this.varName = varName;
- }
- public OutputInfo getOutInfo() {
- return outInfo;
- }
- public void setOutInfo(OutputInfo outInfo) {
- this.outInfo = outInfo;
- }
- public ArrayList<Instruction> getPreInstructions() {
- return preInstructions;
- }
- public void addPreInstruction(Instruction inst) {
- preInstructions.add(inst);
- }
- public ArrayList<Instruction> getPostInstructions() {
- return postInstructions;
- }
- public void addPostInstruction(Instruction inst) {
- postInstructions.add(inst);
- }
- public ArrayList<Instruction> getLastInstructions() {
- return lastInstructions;
- }
- public void addLastInstruction(Instruction inst) {
- lastInstructions.add(inst);
- }
-
- }
-
- private String getFilePath() {
- if ( scratchFilePath.equalsIgnoreCase("") ) {
- scratchFilePath = scratch + Lop.FILE_SEPARATOR
- + Lop.PROCESS_PREFIX + DMLScript.getUUID()
- + Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR
- + ProgramConverter.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR;
- }
- return scratchFilePath;
- }
-
- /**
- * Constructor
- */
- public Dag()
- {
- //allocate internal data structures
- nodes = new ArrayList<N>();
- IDMap = new HashMap<Long, Integer>();
-
- // get number of reducers from dml config
- DMLConfig conf = ConfigurationManager.getConfig();
- total_reducers = conf.getIntValue(DMLConfig.NUM_REDUCERS);
- }
-
- /**
- *
- * @param config
- * @return
- * @throws LopsException
- * @throws IOException
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- public ArrayList<Instruction> getJobs(DMLConfig config)
- throws LopsException, IOException, DMLRuntimeException, DMLUnsupportedOperationException
- {
- return getJobs(null, config);
- }
-
- /**
- * Method to compile a dag generically
- *
- * @param config
- * @throws LopsException
- * @throws DMLUnsupportedOperationException
- * @throws DMLRuntimeException
- */
-
- public ArrayList<Instruction> getJobs(StatementBlock sb, DMLConfig config)
- throws LopsException, IOException, DMLRuntimeException,
- DMLUnsupportedOperationException {
-
- if (config != null)
- {
- total_reducers = config.getIntValue(DMLConfig.NUM_REDUCERS);
- scratch = config.getTextValue(DMLConfig.SCRATCH_SPACE) + "/";
- }
-
- // hold all nodes in a vector (needed for ordering)
- ArrayList<N> node_v = new ArrayList<N>();
- node_v.addAll(nodes);
-
- /*
- * Sort the nodes by topological order.
- *
- * 1) All nodes with level i appear prior to the nodes in level i+1.
- * 2) All nodes within a level are ordered by their ID i.e., in the order
- * they are created
- */
- doTopologicalSort_strict_order(node_v);
-
- // do greedy grouping of operations
- ArrayList<Instruction> inst = doGreedyGrouping(sb, node_v);
-
- return inst;
-
- }
-
- /**
- * Method to remove transient reads that do not have a transient write
- *
- * @param nodeV
- * @param inst
- * @throws DMLUnsupportedOperationException
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("unused")
- private void deleteUnwantedTransientReadVariables(ArrayList<N> nodeV,
- ArrayList<Instruction> inst) throws DMLRuntimeException,
- DMLUnsupportedOperationException {
- HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
-
- LOG.trace("In delete unwanted variables");
-
- // first capture all transient read variables
- for (int i = 0; i < nodeV.size(); i++) {
- N node = nodeV.get(i);
-
- if (node.getExecLocation() == ExecLocation.Data
- && ((Data) node).isTransient()
- && ((Data) node).getOperationType() == OperationTypes.READ) {
- labelNodeMapping.put(node.getOutputParameters().getLabel(),
- node);
- }
- }
-
- // generate delete instructions for all transient read variables without
- // a transient write
- // first capture all transient write variables
- for (int i = 0; i < nodeV.size(); i++) {
- N node = nodeV.get(i);
-
- if (node.getExecLocation() == ExecLocation.Data
- && ((Data) node).isTransient()
- && ((Data) node).getOperationType() == OperationTypes.WRITE ) {
- if (node.getInputs().get(0).getExecLocation() == ExecLocation.Data) {
- // this transient write is NOT a result of actual computation, but is simple copy.
- // in such a case, preserve the input variable so that appropriate copy instruction (cpvar or GMR) is generated
- // therefore, remove the input label from labelNodeMapping
- labelNodeMapping.remove(node.getInputs().get(0).getOutputParameters().getLabel());
- }
- else {
- if(labelNodeMapping.containsKey(node.getOutputParameters().getLabel()))
- // corresponding transient read exists (i.e., the variable is updated)
- // in such a case, generate rmvar instruction so that OLD data gets deleted
- labelNodeMapping.remove(node.getOutputParameters().getLabel());
- }
- }
- }
-
- // generate RM instructions
- Instruction rm_inst = null;
- for( Entry<String, N> e : labelNodeMapping.entrySet() ) {
- String label = e.getKey();
- N node = e.getValue();
-
- if (((Data) node).getDataType() == DataType.SCALAR) {
- // if(DEBUG)
- // System.out.println("rmvar" + Lops.OPERAND_DELIMITOR + label);
- // inst.add(new VariableSimpleInstructions("rmvar" +
- // Lops.OPERAND_DELIMITOR + label));
-
- } else {
- rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
- rm_inst.setLocation(node);
-
- if( LOG.isTraceEnabled() )
- LOG.trace(rm_inst.toString());
- inst.add(rm_inst);
-
- }
- }
-
- }
-
- private void deleteUpdatedTransientReadVariables(StatementBlock sb, ArrayList<N> nodeV,
- ArrayList<Instruction> inst) throws DMLRuntimeException,
- DMLUnsupportedOperationException {
-
- if ( sb == null )
- return;
-
- LOG.trace("In delete updated variables");
-
- /*
- Set<String> in = sb.liveIn().getVariables().keySet();
- Set<String> out = sb.liveOut().getVariables().keySet();
- Set<String> updated = sb.variablesUpdated().getVariables().keySet();
-
- Set<String> intersection = in;
- intersection.retainAll(out);
- intersection.retainAll(updated);
-
- for (String var : intersection) {
- inst.add(VariableCPInstruction.prepareRemoveInstruction(var));
- }
- */
-
- // CANDIDATE list of variables which could have been updated in this statement block
- HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
-
- // ACTUAL list of variables whose value is updated, AND the old value of the variable
- // is no longer accessible/used.
- HashSet<String> updatedLabels = new HashSet<String>();
- HashMap<String, N> updatedLabelsLineNum = new HashMap<String, N>();
-
- // first capture all transient read variables
- for (int i = 0; i < nodeV.size(); i++) {
- N node = nodeV.get(i);
-
- if (node.getExecLocation() == ExecLocation.Data
- && ((Data) node).isTransient()
- && ((Data) node).getOperationType() == OperationTypes.READ
- && ((Data) node).getDataType() == DataType.MATRIX) {
-
- // "node" is considered as updated ONLY IF the old value is not used any more
- // So, make sure that this READ node does not feed into any (transient/persistent) WRITE
- boolean hasWriteParent=false;
- for(Lop p : node.getOutputs()) {
- if(p.getExecLocation() == ExecLocation.Data) {
- // if the "p" is of type Data, then it has to be a WRITE
- hasWriteParent = true;
- break;
- }
- }
-
- if ( !hasWriteParent ) {
- // node has no parent of type WRITE, so this is a CANDIDATE variable
- // add it to labelNodeMapping so that it is considered in further processing
- labelNodeMapping.put(node.getOutputParameters().getLabel(), node);
- }
- }
- }
-
- // capture updated transient write variables
- for (int i = 0; i < nodeV.size(); i++) {
- N node = nodeV.get(i);
-
- if (node.getExecLocation() == ExecLocation.Data
- && ((Data) node).isTransient()
- && ((Data) node).getOperationType() == OperationTypes.WRITE
- && ((Data) node).getDataType() == DataType.MATRIX
- && labelNodeMapping.containsKey(node.getOutputParameters().getLabel()) // check to make sure corresponding (i.e., with the same label/name) transient read is present
- && !labelNodeMapping.containsValue(node.getInputs().get(0)) // check to avoid cases where transient read feeds into a transient write
- ) {
- updatedLabels.add(node.getOutputParameters().getLabel());
- updatedLabelsLineNum.put(node.getOutputParameters().getLabel(), node);
-
- }
- }
-
- // generate RM instructions
- Instruction rm_inst = null;
- for ( String label : updatedLabels )
- {
- rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
- rm_inst.setLocation(updatedLabelsLineNum.get(label));
-
-
- LOG.trace(rm_inst.toString());
- inst.add(rm_inst);
- }
-
- }
-
- private void generateRemoveInstructions(StatementBlock sb,
- ArrayList<Instruction> deleteInst)
- throws DMLUnsupportedOperationException, DMLRuntimeException {
-
- if ( sb == null )
- return;
-
- LOG.trace("In generateRemoveInstructions()");
-
-
- Instruction inst = null;
- // RULE 1: if in IN and not in OUT, then there should be an rmvar or rmfilevar inst
- // (currently required for specific cases of external functions)
- for (String varName : sb.liveIn().getVariableNames()) {
- if (!sb.liveOut().containsVariable(varName)) {
- // DataType dt = in.getVariable(varName).getDataType();
- // if( !(dt==DataType.MATRIX || dt==DataType.UNKNOWN) )
- // continue; //skip rm instructions for non-matrix objects
-
- inst = VariableCPInstruction.prepareRemoveInstruction(varName);
- inst.setLocation(sb.getEndLine(), sb.getEndLine(), -1, -1);
-
- deleteInst.add(inst);
-
- if( LOG.isTraceEnabled() )
- LOG.trace(" Adding " + inst.toString());
- }
- }
-
- // RULE 2: if in KILL and not in IN and not in OUT, then there should be an rmvar or rmfilevar inst
- // (currently required for specific cases of nested loops)
- // i.e., local variables which are created within the block, and used entirely within the block
- /*for (String varName : sb.getKill().getVariableNames()) {
- if ((!sb.liveIn().containsVariable(varName))
- && (!sb.liveOut().containsVariable(varName))) {
- // DataType dt =
- // sb.getKill().getVariable(varName).getDataType();
- // if( !(dt==DataType.MATRIX || dt==DataType.UNKNOWN) )
- // continue; //skip rm instructions for non-matrix objects
-
- inst = createCleanupInstruction(varName);
- deleteInst.add(inst);
-
- if (DMLScript.DEBUG)
- System.out.println("Adding instruction (r2) "
- + inst.toString());
- }
- }*/
- }
-
- /**
- * Method to add a node to the DAG.
- *
- * @param node
- * @return true if node was not already present, false if not.
- */
-
- public boolean addNode(N node) {
- if (nodes.contains(node))
- return false;
- else {
- nodes.add(node);
- return true;
- }
-
- }
-
- private ArrayList<ArrayList<N>> createNodeVectors(int size) {
- ArrayList<ArrayList<N>> arr = new ArrayList<ArrayList<N>>();
-
- // for each job type, we need to create a vector.
- // additionally, create another vector for execNodes
- for (int i = 0; i < size; i++) {
- arr.add(new ArrayList<N>());
- }
- return arr;
- }
-
- private void clearNodeVectors(ArrayList<ArrayList<N>> arr) {
- for (int i = 0; i < arr.size(); i++) {
- arr.get(i).clear();
- }
- }
-
- private boolean isCompatible(ArrayList<N> nodes, JobType jt, int from,
- int to) throws LopsException {
-
- int base = jt.getBase();
-
- for (int i = from; i < to; i++) {
- if ((nodes.get(i).getCompatibleJobs() & base) == 0) {
- if( LOG.isTraceEnabled() )
- LOG.trace("Not compatible "+ nodes.get(i).toString());
- return false;
- }
- }
- return true;
- }
-
- /**
- * Function that determines if the two input nodes can be executed together
- * in at least on job.
- *
- * @param node1
- * @param node2
- * @return
- */
- private boolean isCompatible(N node1, N node2) {
- return( (node1.getCompatibleJobs() & node2.getCompatibleJobs()) > 0);
- }
-
- /**
- * Function that checks if the given node executes in the job specified by jt.
- *
- * @param node
- * @param jt
- * @return
- */
- private boolean isCompatible(N node, JobType jt) {
- if ( jt == JobType.GMRCELL )
- jt = JobType.GMR;
- return ((node.getCompatibleJobs() & jt.getBase()) > 0);
- }
-
- /*
- * Add node, and its relevant children to job-specific node vectors.
- */
- private void addNodeByJobType(N node, ArrayList<ArrayList<N>> arr,
- ArrayList<N> execNodes, boolean eliminate) throws LopsException {
-
- if (!eliminate) {
- // Check if this lop defines a MR job.
- if ( node.definesMRJob() ) {
-
- // find the corresponding JobType
- JobType jt = JobType.findJobTypeFromLop(node);
-
- if ( jt == null ) {
- throw new LopsException(node.printErrorLocation() + "No matching JobType is found for a the lop type: " + node.getType() + " \n");
- }
-
- // Add "node" to corresponding job vector
-
- if ( jt == JobType.GMR ) {
- if ( node.hasNonBlockedInputs() ) {
- int gmrcell_index = JobType.GMRCELL.getId();
- arr.get(gmrcell_index).add(node);
- int from = arr.get(gmrcell_index).size();
- addChildren(node, arr.get(gmrcell_index), execNodes);
- int to = arr.get(gmrcell_index).size();
- if (!isCompatible(arr.get(gmrcell_index),JobType.GMR, from, to)) // check against GMR only, not against GMRCELL
- throw new LopsException(node.printErrorLocation() + "Error during compatibility check \n");
- }
- else {
- // if "node" (in this case, a group lop) has any inputs from RAND
- // then add it to RAND job. Otherwise, create a GMR job
- if (hasChildNode(node, arr.get(JobType.DATAGEN.getId()) )) {
- arr.get(JobType.DATAGEN.getId()).add(node);
- // we should NOT call 'addChildren' because appropriate
- // child nodes would have got added to RAND job already
- } else {
- int gmr_index = JobType.GMR.getId();
- arr.get(gmr_index).add(node);
- int from = arr.get(gmr_index).size();
- addChildren(node, arr.get(gmr_index), execNodes);
- int to = arr.get(gmr_index).size();
- if (!isCompatible(arr.get(gmr_index),JobType.GMR, from, to))
- throw new LopsException(node.printErrorLocation() + "Error during compatibility check \n");
- }
- }
- }
- else {
- int index = jt.getId();
- arr.get(index).add(node);
- int from = arr.get(index).size();
- addChildren(node, arr.get(index), execNodes);
- int to = arr.get(index).size();
- // check if all added nodes are compatible with current job
- if (!isCompatible(arr.get(index), jt, from, to)) {
- throw new LopsException(
- "Unexpected error in addNodeByType.");
- }
- }
- return;
- }
- }
-
- if ( eliminate ) {
- // Eliminated lops are directly added to GMR queue.
- // Note that eliminate flag is set only for 'group' lops
- if ( node.hasNonBlockedInputs() )
- arr.get(JobType.GMRCELL.getId()).add(node);
- else
- arr.get(JobType.GMR.getId()).add(node);
- return;
- }
-
- /*
- * If this lop does not define a job, check if it uses the output of any
- * specialized job. i.e., if this lop has a child node in any of the
- * job-specific vector, then add it to the vector. Note: This lop must
- * be added to ONLY ONE of the job-specific vectors.
- */
-
- int numAdded = 0;
- for ( JobType j : JobType.values() ) {
- if ( j.getId() > 0 && hasDirectChildNode(node, arr.get(j.getId()))) {
- if (isCompatible(node, j)) {
- arr.get(j.getId()).add(node);
- numAdded += 1;
- }
- }
- }
- if (numAdded > 1) {
- throw new LopsException("Unexpected error in addNodeByJobType(): A given lop can ONLY be added to a single job vector (numAdded = " + numAdded + ")." );
- }
- }
-
- /*
- * Remove the node from all job-specific node vectors. This method is
- * invoked from removeNodesForNextIteration().
- */
- private void removeNodeByJobType(N node, ArrayList<ArrayList<N>> arr) {
- for ( JobType jt : JobType.values())
- if ( jt.getId() > 0 )
- arr.get(jt.getId()).remove(node);
- }
-
- /**
- * As some jobs only write one output, all operations in the mapper need to
- * be redone and cannot be marked as finished.
- *
- * @param execNodes
- * @param jobNodes
- * @throws LopsException
- */
-
- private void handleSingleOutputJobs(ArrayList<N> execNodes,
- ArrayList<ArrayList<N>> jobNodes, ArrayList<N> finishedNodes)
- throws LopsException {
- /*
- * If the input of a MMCJ/MMRJ job (must have executed in a Mapper) is used
- * by multiple lops then we should mark it as not-finished.
- */
- ArrayList<N> nodesWithUnfinishedOutputs = new ArrayList<N>();
- int[] jobIndices = {JobType.MMCJ.getId()};
- Lop.Type[] lopTypes = { Lop.Type.MMCJ};
-
- // TODO: SortByValue should be treated similar to MMCJ, since it can
- // only sort one file now
-
- for ( int jobi=0; jobi < jobIndices.length; jobi++ ) {
- int jindex = jobIndices[jobi];
- if (!jobNodes.get(jindex).isEmpty()) {
- ArrayList<N> vec = jobNodes.get(jindex);
-
- // first find all nodes with more than one parent that is not
- // finished.
-
- for (int i = 0; i < vec.size(); i++) {
- N node = vec.get(i);
- if (node.getExecLocation() == ExecLocation.MapOrReduce
- || node.getExecLocation() == ExecLocation.Map) {
- N MRparent = getParentNode(node, execNodes, ExecLocation.MapAndReduce);
- if ( MRparent != null && MRparent.getType() == lopTypes[jobi]) {
- int numParents = node.getOutputs().size();
- if (numParents > 1) {
- for (int j = 0; j < numParents; j++) {
- if (!finishedNodes.contains(node.getOutputs()
- .get(j)))
- nodesWithUnfinishedOutputs.add(node);
- }
-
- }
- }
- }
- /*
- // Following condition will not work for MMRJ because execNodes may contain non-MapAndReduce
- // lops that are compatible with MMRJ (e.g., Data-WRITE)
- else if (!(node.getExecLocation() == ExecLocation.MapAndReduce
- && node.getType() == lopTypes[jobi])) {
- throw new LopsException(
- "Error in handleSingleOutputJobs(): encountered incompatible execLocation='"
- + node.getExecLocation() + "' for lop (ID="
- + node.getID() + ").");
- }
- */
- }
-
- // need to redo all nodes in nodesWithOutput as well as their
- // children
-
- for (int i = 0; i < vec.size(); i++) {
- N node = vec.get(i);
- if (node.getExecLocation() == ExecLocation.MapOrReduce
- || node.getExecLocation() == ExecLocation.Map) {
- if (nodesWithUnfinishedOutputs.contains(node))
- finishedNodes.remove(node);
-
- if (hasParentNode(node, nodesWithUnfinishedOutputs))
- finishedNodes.remove(node);
-
- }
- }
- }
- }
-
- }
-
- /** Method to check if a lop can be eliminated from checking **/
- private boolean canEliminateLop(N node, ArrayList<N> execNodes) {
- // this function can only eliminate "aligner" lops such a group
- if (!node.isAligner())
- return false;
-
- // find the child whose execLoc = 'MapAndReduce'
- int ret = getChildAlignment(node, execNodes, ExecLocation.MapAndReduce);
-
- if (ret == CHILD_BREAKS_ALIGNMENT)
- return false;
- else if (ret == CHILD_DOES_NOT_BREAK_ALIGNMENT)
- return true;
- else if (ret == MRCHILD_NOT_FOUND)
- return false;
- else if (ret == MR_CHILD_FOUND_BREAKS_ALIGNMENT)
- return false;
- else if (ret == MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT)
- return true;
- else
- throw new RuntimeException("Should not happen. \n");
- }
-
-
- /**
- * Method to generate createvar instructions, which creates a new entry
- * in the symbol table. One instruction is generated for every LOP that is
- * 1) type Data and
- * 2) persistent and
- * 3) matrix and
- * 4) read
- *
- * Transient reads needn't be considered here since the previous program
- * block would already create appropriate entries in the symbol table.
- *
- * @param nodes
- * @throws LopsException
- */
- private void generateInstructionsForInputVariables(ArrayList<N> nodes_v, ArrayList<Instruction> inst) throws LopsException, IOException {
- for(N n : nodes_v) {
- if (n.getExecLocation() == ExecLocation.Data && !((Data) n).isTransient()
- && ((Data) n).getOperationType() == OperationTypes.READ
- && (n.getDataType() == DataType.MATRIX || n.getDataType() == DataType.FRAME) ) {
-
- if ( !((Data)n).isLiteral() ) {
- try {
- String inst_string = n.getInstructions();
- CPInstruction currInstr = CPInstructionParser.parseSingleInstruction(inst_string);
- currInstr.setLocation(n);
- inst.add(currInstr);
- } catch (DMLUnsupportedOperationException e) {
- throw new LopsException(n.printErrorLocation() + "error generating instructions from input variables in Dag -- \n", e);
- } catch (DMLRuntimeException e) {
- throw new LopsException(n.printErrorLocation() + "error generating instructions from input variables in Dag -- \n", e);
- }
- }
- }
- }
- }
-
- /*private String getOutputFormat(Data node) {
- if(node.getFileFormatType() == FileFormatTypes.TEXT)
- return "textcell";
- else if ( node.getOutputParameters().isBlocked_representation() )
- return "binaryblock";
- else
- return "binarycell";
- }*/
-
- /**
- * Determine whether to send <code>node</code> to MR or to process it in the control program.
- * It is sent to MR in the following cases:
- *
- * 1) if input lop gets processed in MR then <code>node</code> can be piggybacked
- *
- * 2) if the exectype of write lop itself is marked MR i.e., memory estimate > memory budget.
- *
- * @param node
- * @return
- */
- @SuppressWarnings("unchecked")
- private boolean sendWriteLopToMR(N node)
- {
- if ( DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE )
- return false;
- N in = (N) node.getInputs().get(0);
- Format nodeFormat = node.getOutputParameters().getFormat();
-
- // Case of a transient read feeding into only one output persistent binaryblock write
- // Move the temporary file on HDFS to required persistent location, insteadof copying.
- if ( in.getExecLocation() == ExecLocation.Data && in.getOutputs().size() == 1
- && !((Data)node).isTransient()
- && ((Data)in).isTransient()
- && ((Data)in).getOutputParameters().isBlocked()
- && node.getOutputParameters().isBlocked() ) {
- return false;
- }
-
- //send write lop to MR if (1) it is marked with exec type MR (based on its memory estimate), or
- //(2) if the input lop is in MR and the write format allows to pack it into the same job (this does
- //not apply to csv write because MR csvwrite is a separate MR job type)
- if( node.getExecType() == ExecType.MR || (in.getExecType() == ExecType.MR && nodeFormat != Format.CSV ) )
- return true;
- else
- return false;
- }
-
- /**
- * Computes the memory footprint required to execute <code>node</code> in the mapper.
- * It is used only for those nodes that use inputs from distributed cache. The returned
- * value is utilized in limiting the number of instructions piggybacked onto a single GMR mapper.
- */
- private double computeFootprintInMapper(N node) {
- // Memory limits must be checked only for nodes that use distributed cache
- if ( ! node.usesDistributedCache() )
- // default behavior
- return 0.0;
-
- OutputParameters in1dims = node.getInputs().get(0).getOutputParameters();
- OutputParameters in2dims = node.getInputs().get(1).getOutputParameters();
-
- double footprint = 0;
- if ( node instanceof MapMult ) {
- int dcInputIndex = node.distributedCacheInputIndex()[0];
- footprint = AggBinaryOp.getMapmmMemEstimate(
- in1dims.getNumRows(), in1dims.getNumCols(), in1dims.getRowsInBlock(), in1dims.getColsInBlock(), in1dims.getNnz(),
- in2dims.getNumRows(), in2dims.getNumCols(), in2dims.getRowsInBlock(), in2dims.getColsInBlock(), in2dims.getNnz(),
- dcInputIndex, false);
- }
- else if ( node instanceof PMMJ ) {
- int dcInputIndex = node.distributedCacheInputIndex()[0];
- footprint = AggBinaryOp.getMapmmMemEstimate(
- in1dims.getNumRows(), 1, in1dims.getRowsInBlock(), in1dims.getColsInBlock(), in1dims.getNnz(),
- in2dims.getNumRows(), in2dims.getNumCols(), in2dims.getRowsInBlock(), in2dims.getColsInBlock(), in2dims.getNnz(),
- dcInputIndex, true);
- }
- else if ( node instanceof AppendM ) {
- footprint = BinaryOp.footprintInMapper(
- in1dims.getNumRows(), in1dims.getNumCols(),
- in2dims.getNumRows(), in2dims.getNumCols(),
- in1dims.getRowsInBlock(), in1dims.getColsInBlock());
- }
- else if ( node instanceof BinaryM ) {
- footprint = BinaryOp.footprintInMapper(
- in1dims.getNumRows(), in1dims.getNumCols(),
- in2dims.getNumRows(), in2dims.getNumCols(),
- in1dims.getRowsInBlock(), in1dims.getColsInBlock());
- }
- else {
- // default behavior
- return 0.0;
- }
- return footprint;
- }
-
- /**
- * Determines if <code>node</code> can be executed in current round of MR jobs or if it needs to be queued for later rounds.
- * If the total estimated footprint (<code>node</code> and previously added nodes in GMR) is less than available memory on
- * the mappers then <code>node</code> can be executed in current round, and <code>true</code> is returned. Otherwise,
- * <code>node</code> must be queued and <code>false</code> is returned.
- */
- private boolean checkMemoryLimits(N node, double footprintInMapper) {
- boolean addNode = true;
-
- // Memory limits must be checked only for nodes that use distributed cache
- if ( ! node.usesDistributedCache() )
- // default behavior
- return addNode;
-
- double memBudget = Math.min(AggBinaryOp.MAPMULT_MEM_MULTIPLIER, BinaryOp.APPEND_MEM_MULTIPLIER) * OptimizerUtils.getRemoteMemBudgetMap(true);
- if ( footprintInMapper <= memBudget )
- return addNode;
- else
- return !addNode;
- }
-
- /**
- * Method to group a vector of sorted lops.
- *
- * @param node_v
- * @throws LopsException
- * @throws DMLUnsupportedOperationException
- * @throws DMLRuntimeException
- */
-
- @SuppressWarnings("unchecked")
- private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, ArrayList<N> node_v)
- throws LopsException, IOException, DMLRuntimeException,
- DMLUnsupportedOperationException
-
- {
- LOG.trace("Grouping DAG ============");
-
- // nodes to be executed in current iteration
- ArrayList<N> execNodes = new ArrayList<N>();
- // nodes that have already been processed
- ArrayList<N> finishedNodes = new ArrayList<N>();
- // nodes that are queued for the following iteration
- ArrayList<N> queuedNodes = new ArrayList<N>();
-
- ArrayList<ArrayList<N>> jobNodes = createNodeVectors(JobType.getNumJobTypes());
-
-
- // list of instructions
- ArrayList<Instruction> inst = new ArrayList<Instruction>();
-
- //ArrayList<Instruction> preWriteDeleteInst = new ArrayList<Instruction>();
- ArrayList<Instruction> writeInst = new ArrayList<Instruction>();
- ArrayList<Instruction> deleteInst = new ArrayList<Instruction>();
- ArrayList<Instruction> endOfBlockInst = new ArrayList<Instruction>();
-
- // delete transient variables that are no longer needed
- //deleteUnwantedTransientReadVariables(node_v, deleteInst);
-
- // remove files for transient reads that are updated.
- deleteUpdatedTransientReadVariables(sb, node_v, writeInst);
-
- generateRemoveInstructions(sb, endOfBlockInst);
-
- generateInstructionsForInputVariables(node_v, inst);
-
-
- boolean done = false;
- String indent = " ";
-
- while (!done) {
- LOG.trace("Grouping nodes in DAG");
-
- execNodes.clear();
- queuedNodes.clear();
- clearNodeVectors(jobNodes);
- gmrMapperFootprint=0;
-
- for (int i = 0; i < node_v.size(); i++) {
- N node = node_v.get(i);
-
- // finished nodes don't need to be processed
-
- if (finishedNodes.contains(node))
- continue;
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Processing node (" + node.getID()
- + ") " + node.toString() + " exec nodes size is " + execNodes.size());
-
-
- //if node defines MR job, make sure it is compatible with all
- //its children nodes in execNodes
- if(node.definesMRJob() && !compatibleWithChildrenInExecNodes(execNodes, node))
- {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Queueing node "
- + node.toString() + " (code 1)");
-
- queuedNodes.add(node);
- removeNodesForNextIteration(node, finishedNodes, execNodes, queuedNodes, jobNodes);
- continue;
- }
-
- // if child is queued, this node will be processed in the later
- // iteration
- if (hasChildNode(node,queuedNodes)) {
-
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Queueing node "
- + node.toString() + " (code 2)");
- //for(N q: queuedNodes) {
- // LOG.trace(indent + " " + q.getType() + "," + q.getID());
- //}
- queuedNodes.add(node);
-
- // if node has more than two inputs,
- // remove children that will be needed in a future
- // iterations
- // may also have to remove parent nodes of these children
- removeNodesForNextIteration(node, finishedNodes, execNodes,
- queuedNodes, jobNodes);
-
- continue;
- }
-
- // if inputs come from different jobs, then queue
- if ( node.getInputs().size() >= 2) {
- int jobid = Integer.MIN_VALUE;
- boolean queueit = false;
- for(int idx=0; idx < node.getInputs().size(); idx++) {
- int input_jobid = jobType(node.getInputs().get(idx), jobNodes);
- if (input_jobid != -1) {
- if ( jobid == Integer.MIN_VALUE )
- jobid = input_jobid;
- else if ( jobid != input_jobid ) {
- queueit = true;
- break;
- }
- }
- }
- if ( queueit ) {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Queueing node " + node.toString() + " (code 3)");
- queuedNodes.add(node);
- removeNodesForNextIteration(node, finishedNodes, execNodes, queuedNodes, jobNodes);
- continue;
- }
- }
-
- /*if (node.getInputs().size() == 2) {
- int j1 = jobType(node.getInputs().get(0), jobNodes);
- int j2 = jobType(node.getInputs().get(1), jobNodes);
- if (j1 != -1 && j2 != -1 && j1 != j2) {
- LOG.trace(indent + "Queueing node "
- + node.toString() + " (code 3)");
-
- queuedNodes.add(node);
-
- removeNodesForNextIteration(node, finishedNodes,
- execNodes, queuedNodes, jobNodes);
-
- continue;
- }
- }*/
-
- // See if this lop can be eliminated
- // This check is for "aligner" lops (e.g., group)
- boolean eliminate = false;
- eliminate = canEliminateLop(node, execNodes);
- if (eliminate) {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Adding -"+ node.toString());
- execNodes.add(node);
- finishedNodes.add(node);
- addNodeByJobType(node, jobNodes, execNodes, eliminate);
- continue;
- }
-
- // If the node defines a MR Job then make sure none of its
- // children that defines a MR Job are present in execNodes
- if (node.definesMRJob()) {
- if (hasMRJobChildNode(node, execNodes)) {
- // "node" must NOT be queued when node=group and the child that defines job is Rand
- // this is because "group" can be pushed into the "Rand" job.
- if (! (node.getType() == Lop.Type.Grouping && checkDataGenAsChildNode(node,execNodes)) ) {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Queueing node " + node.toString() + " (code 4)");
-
- queuedNodes.add(node);
-
- removeNodesForNextIteration(node, finishedNodes,
- execNodes, queuedNodes, jobNodes);
-
- continue;
- }
- }
- }
-
- // if "node" has more than one input, and has a descendant lop
- // in execNodes that is of type RecordReader
- // then all its inputs must be ancestors of RecordReader. If
- // not, queue "node"
- if (node.getInputs().size() > 1
- && hasChildNode(node, execNodes, ExecLocation.RecordReader)) {
- // get the actual RecordReader lop
- N rr_node = getChildNode(node, execNodes, ExecLocation.RecordReader);
-
- // all inputs of "node" must be ancestors of rr_node
- boolean queue_it = false;
- for (int in = 0; in < node.getInputs().size(); in++) {
- // each input should be ancestor of RecordReader lop
- N n = (N) node.getInputs().get(in);
- if (!n.equals(rr_node) && !isChild(rr_node, n, IDMap)) {
- queue_it = true; // i.e., "node" must be queued
- break;
- }
- }
-
- if (queue_it) {
- // queue node
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Queueing -" + node.toString() + " (code 5)");
- queuedNodes.add(node);
- // TODO: does this have to be modified to handle
- // recordreader lops?
- removeNodesForNextIteration(node, finishedNodes,
- execNodes, queuedNodes, jobNodes);
- continue;
- } else {
- // nothing here.. subsequent checks have to be performed
- // on "node"
- ;
- }
- }
-
- // data node, always add if child not queued
- // only write nodes are kept in execnodes
- if (node.getExecLocation() == ExecLocation.Data) {
- Data dnode = (Data) node;
- boolean dnode_queued = false;
-
- if ( dnode.getOperationType() == OperationTypes.READ ) {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Adding Data -"+ node.toString());
-
- // TODO: avoid readScalar instruction, and read it on-demand just like the way Matrices are read in control program
- if ( node.getDataType() == DataType.SCALAR
- //TODO: LEO check the following condition is still needed
- && node.getOutputParameters().getFile_name() != null ) {
- // this lop corresponds to reading a scalar from HDFS file
- // add it to execNodes so that "readScalar" instruction gets generated
- execNodes.add(node);
- // note: no need to add it to any job vector
- }
- }
- else if (dnode.getOperationType() == OperationTypes.WRITE) {
- // Skip the transient write <code>node</code> if the input is a
- // transient read with the same variable name. i.e., a dummy copy.
- // Hence, <code>node</code> can be avoided.
- // TODO: this case should ideally be handled in the language layer
- // prior to the construction of Hops Dag
- N input = (N) dnode.getInputs().get(0);
- if ( dnode.isTransient()
- && input.getExecLocation() == ExecLocation.Data
- && ((Data)input).isTransient()
- && dnode.getOutputParameters().getLabel().compareTo(input.getOutputParameters().getLabel()) == 0 ) {
- // do nothing, <code>node</code> must not processed any further.
- ;
- }
- else if ( execNodes.contains(input) && !isCompatible(node, input) && sendWriteLopToMR(node)) {
- // input is in execNodes but it is not compatible with write lop. So, queue the write lop.
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Queueing -" + node.toString());
- queuedNodes.add(node);
- dnode_queued = true;
- }
- else {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Adding Data -"+ node.toString());
-
- execNodes.add(node);
- if ( sendWriteLopToMR(node) ) {
- addNodeByJobType(node, jobNodes, execNodes, false);
- }
- }
- }
- if (!dnode_queued)
- finishedNodes.add(node);
-
- continue;
- }
-
- // map or reduce node, can always be piggybacked with parent
- if (node.getExecLocation() == ExecLocation.MapOrReduce) {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Adding -"+ node.toString());
- execNodes.add(node);
- finishedNodes.add(node);
- addNodeByJobType(node, jobNodes, execNodes, false);
-
- continue;
- }
-
- // RecordReader node, add, if no parent needs reduce, else queue
- if (node.getExecLocation() == ExecLocation.RecordReader) {
- // "node" should not have any children in
- // execNodes .. it has to be the first one in the job!
- if (!hasChildNode(node, execNodes, ExecLocation.Map)
- && !hasChildNode(node, execNodes,
- ExecLocation.MapAndReduce)) {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Adding -"+ node.toString());
- execNodes.add(node);
- finishedNodes.add(node);
- addNodeByJobType(node, jobNodes, execNodes, false);
- } else {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Queueing -"+ node.toString() + " (code 6)");
- queuedNodes.add(node);
- removeNodesForNextIteration(node, finishedNodes,
- execNodes, queuedNodes, jobNodes);
-
- }
- continue;
- }
-
- // map node, add, if no parent needs reduce, else queue
- if (node.getExecLocation() == ExecLocation.Map) {
- boolean queueThisNode = false;
- int subcode = -1;
- if ( node.usesDistributedCache() ) {
- // if an input to <code>node</code> comes from distributed cache
- // then that input must get executed in one of the previous jobs.
- int[] dcInputIndexes = node.distributedCacheInputIndex();
- for( int dcInputIndex : dcInputIndexes ){
- N dcInput = (N) node.getInputs().get(dcInputIndex-1);
- if ( (dcInput.getType() != Lop.Type.Data && dcInput.getExecType()==ExecType.MR)
- && execNodes.contains(dcInput) )
- {
- queueThisNode = true;
- subcode = 1;
- }
- }
-
- // Limit the number of distributed cache inputs based on the available memory in mappers
- double memsize = computeFootprintInMapper(node);
- //gmrMapperFootprint += computeFootprintInMapper(node);
- if ( gmrMapperFootprint>0 && !checkMemoryLimits(node, gmrMapperFootprint+memsize ) ) {
- queueThisNode = true;
- subcode = 2;
- }
- if(!queueThisNode)
- gmrMapperFootprint += memsize;
- }
- if (!queueThisNode && !hasChildNode(node, execNodes,ExecLocation.MapAndReduce)&& !hasMRJobChildNode(node, execNodes)) {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Adding -"+ node.toString());
- execNodes.add(node);
- finishedNodes.add(node);
- addNodeByJobType(node, jobNodes, execNodes, false);
- } else {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Queueing -"+ node.toString() + " (code 7 - " + "subcode " + subcode + ")");
- queuedNodes.add(node);
- removeNodesForNextIteration(node, finishedNodes,
- execNodes, queuedNodes, jobNodes);
-
- }
- continue;
- }
-
- // reduce node, make sure no parent needs reduce, else queue
- if (node.getExecLocation() == ExecLocation.MapAndReduce) {
-
- // boolean eliminate = false;
- // eliminate = canEliminateLop(node, execNodes);
- // if (eliminate || (!hasChildNode(node, execNodes,
- // ExecLocation.MapAndReduce)) &&
- // !hasMRJobChildNode(node,execNodes)) {
-
- // TODO: statiko -- keep the middle condition
- // discuss about having a lop that is MapAndReduce but does
- // not define a job
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Adding -"+ node.toString());
- execNodes.add(node);
- finishedNodes.add(node);
- addNodeByJobType(node, jobNodes, execNodes, eliminate);
-
- // } else {
- // if (DEBUG)
- // System.out.println("Queueing -" + node.toString());
- // queuedNodes.add(node);
- // removeNodesForNextIteration(node, finishedNodes,
- // execNodes, queuedNodes, jobNodes);
- // }
- continue;
- }
-
- // aligned reduce, make sure a parent that is reduce exists
- if (node.getExecLocation() == ExecLocation.Reduce) {
- if ( compatibleWithChildrenInExecNodes(execNodes, node) &&
- (hasChildNode(node, execNodes, ExecLocation.MapAndReduce)
- || hasChildNode(node, execNodes, ExecLocation.Map) ) )
- {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Adding -"+ node.toString());
- execNodes.add(node);
- finishedNodes.add(node);
- addNodeByJobType(node, jobNodes, execNodes, false);
- } else {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Queueing -"+ node.toString() + " (code 8)");
- queuedNodes.add(node);
- removeNodesForNextIteration(node, finishedNodes,
- execNodes, queuedNodes, jobNodes);
- }
-
- continue;
-
- }
-
- // add Scalar to execNodes if it has no child in exec nodes
- // that will be executed in a MR job.
- if (node.getExecLocation() == ExecLocation.ControlProgram) {
- for (int j = 0; j < node.getInputs().size(); j++) {
- if (execNodes.contains(node.getInputs().get(j))
- && !(node.getInputs().get(j).getExecLocation() == ExecLocation.Data)
- && !(node.getInputs().get(j).getExecLocation() == ExecLocation.ControlProgram)) {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Queueing -"+ node.toString() + " (code 9)");
-
- queuedNodes.add(node);
- removeNodesForNextIteration(node, finishedNodes,
- execNodes, queuedNodes, jobNodes);
- break;
- }
- }
-
- if (queuedNodes.contains(node))
- continue;
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Adding - scalar"+ node.toString());
- execNodes.add(node);
- addNodeByJobType(node, jobNodes, execNodes, false);
- finishedNodes.add(node);
- continue;
- }
-
- }
-
- // no work to do
- if ( execNodes.isEmpty() ) {
-
- if( !queuedNodes.isEmpty() )
- {
- //System.err.println("Queued nodes should be 0");
- throw new LopsException("Queued nodes should not be 0 at this point \n");
- }
-
- if( LOG.isTraceEnabled() )
- LOG.trace("All done! queuedNodes = "+ queuedNodes.size());
-
- done = true;
- } else {
- // work to do
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Generating jobs for group -- Node count="+ execNodes.size());
-
- // first process scalar instructions
- generateControlProgramJobs(execNodes, inst, writeInst, deleteInst);
-
- // copy unassigned lops in execnodes to gmrnodes
- for (int i = 0; i < execNodes.size(); i++) {
- N node = execNodes.get(i);
- if (jobType(node, jobNodes) == -1) {
- if ( isCompatible(node, JobType.GMR) ) {
- if ( node.hasNonBlockedInputs() ) {
- jobNodes.get(JobType.GMRCELL.getId()).add(node);
- addChildren(node, jobNodes.get(JobType.GMRCELL.getId()), execNodes);
- }
- else {
- jobNodes.get(JobType.GMR.getId()).add(node);
- addChildren(node, jobNodes.get(JobType.GMR.getId()), execNodes);
- }
- }
- else {
- if( LOG.isTraceEnabled() )
- LOG.trace(indent + "Queueing -" + node.toString() + " (code 10)");
- execNodes.remove(i);
- finishedNodes.remove(node);
- queuedNodes.add(node);
- removeNodesForNextIteration(node, finishedNodes,
- execNodes, queuedNodes, jobNodes);
- }
- }
- }
-
- // next generate MR instructions
- if (!execNodes.isEmpty())
- generateMRJobs(execNodes, inst, writeInst, deleteInst, jobNodes);
-
- handleSingleOutputJobs(execNodes, jobNodes, finishedNodes);
-
- }
-
- }
-
- // add write and delete inst at the very end.
-
- //inst.addAll(preWriteDeleteInst);
- inst.addAll(writeInst);
- inst.addAll(deleteInst);
- inst.addAll(endOfBlockInst);
-
- return inst;
-
- }
-
- private boolean compatibleWithChildrenInExecNodes(ArrayList<N> execNodes, N node) {
- for(int i=0; i < execNodes.size(); i++)
- {
- N tmpNode = execNodes.get(i);
- // for lops that execute in control program, compatibleJobs property is set to LopProperties.INVALID
- // we should not consider such lops in this check
- if (isChild(tmpNode, node, IDMap)
- && tmpNode.getExecLocation() != ExecLocation.ControlProgram
- //&& tmpNode.getCompatibleJobs() != LopProperties.INVALID
- && (tmpNode.getCompatibleJobs() & node.getCompatibleJobs()) == 0)
- return false;
- }
- return true;
- }
-
- /**
- * Exclude rmvar instruction for <varname> from deleteInst, if exists
- *
- * @param varName
- * @param deleteInst
- */
- private void excludeRemoveInstruction(String varName, ArrayList<Instruction> deleteInst) {
- //for(Instruction inst : deleteInst) {
- for(int i=0; i < deleteInst.size(); i++) {
- Instruction inst = deleteInst.get(i);
- if ((inst.getType() == INSTRUCTION_TYPE.CONTROL_PROGRAM || inst.getType() == INSTRUCTION_TYPE.SPARK)
- && ((CPInstruction)inst).getCPInstructionType() == CPINSTRUCTION_TYPE.Variable
- && ((VariableCPInstruction)inst).isRemoveVariable(varName) ) {
- deleteInst.remove(i);
- }
- }
- }
-
- /**
- * Generate rmvar instructions for the inputs, if their consumer count becomes zero.
- *
- * @param node
- * @param inst
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- @SuppressWarnings("unchecked")
- private void processConsumersForInputs(N node, ArrayList<Instruction> inst, ArrayList<Instruction> delteInst) throws DMLRuntimeException, DMLUnsupportedOperationException {
- // reduce the consumer count for all input lops
- // if the count becomes zero, then then variable associated w/ input can be removed
- for(Lop in : node.getInputs() ) {
- if(DMLScript.ENABLE_DEBUG_MODE) {
- processConsumers((N)in, inst, delteInst, node);
- }
- else {
- processConsumers((N)in, inst, delteInst, null);
- }
-
- /*if ( in.removeConsumer() == 0 ) {
- String label = in.getOutputParameters().getLabel();
- inst.add(VariableCPInstruction.prepareRemoveInstruction(label));
- }*/
- }
- }
-
- private void processConsumers(N node, ArrayList<Instruction> inst, ArrayList<Instruction> deleteInst, N locationInfo) throws DMLRuntimeException, DMLUnsupportedOperationException {
- // reduce the consumer count for all input lops
- // if the count becomes zero, then then variable associated w/ input can be removed
- if ( node.removeConsumer() == 0 ) {
- if ( node.getExecLocation() == ExecLocation.Data && ((Data)node).isLiteral() ) {
- return;
- }
-
- String label = node.getOutputParameters().getLabel();
- Instruction currInstr = VariableCPInstruction.prepareRemoveInstruction(label);
- if (locationInfo != null)
- currInstr.setLocation(locationInfo);
- else
- currInstr.setLocation(node);
-
- inst.add(currInstr);
- excludeRemoveInstruction(label, deleteInst);
- }
- }
-
- /**
- * Method to generate instructions that are executed in Control Program. At
- * this point, this DAG has no dependencies on the MR dag. ie. none of the
- * inputs are outputs of MR jobs
- *
- * @param execNodes
- * @param inst
- * @param deleteInst
- * @throws LopsException
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
-
- @SuppressWarnings("unchecked")
- private void generateControlProgramJobs(ArrayList<N> execNodes,
- ArrayList<Instruction> inst, ArrayList<Instruction> writeInst, ArrayList<Instruction> deleteInst) throws LopsException,
- DMLUnsupportedOperationException, DMLRuntimeException {
-
- // nodes to be deleted from execnodes
- ArrayList<N> markedNodes = new ArrayList<N>();
-
- // variable names to be deleted
- ArrayList<String> var_deletions = new ArrayList<String>();
- HashMap<String, N> var_deletionsLineNum = new HashMap<String, N>();
-
- boolean doRmVar = false;
-
- for (int i = 0; i < execNodes.size(); i++) {
- N node = execNodes.get(i);
- doRmVar = false;
-
- // mark input scalar read nodes for deletion
- // TODO: statiko -- check if this condition ever evaluated to TRUE
- if (node.getExecLocation() == ExecLocation.Data
- && ((Data) node).getOperationType() == Data.OperationTypes.READ
- && ((Data) node).getDataType() == DataType.SCALAR
- && node.getOutputParameters().getFile_name() == null ) {
- markedNodes.add(node);
- continue;
- }
-
- // output scalar instructions and mark nodes for deletion
- if (node.getExecLocation() == ExecLocation.ControlProgram) {
-
- if (node.getDataType() == DataType.SCALAR) {
- // Output from lops with SCALAR data type must
- // go into Temporary Variables (Var0, Var1, etc.)
- NodeOutput out = setupNodeOutputs(node, ExecType.CP, false, false);
- inst.addAll(out.getPreInstructions()); // dummy
- deleteInst.addAll(out.getLastInstructions());
- } else {
- // Output from lops with non-SCALAR data type must
- // go into Temporary Files (temp0, temp1, etc.)
-
- NodeOutput out = setupNodeOutputs(node, ExecType.CP, false, false);
- inst.addAll(out.getPreInstructions());
-
- boolean hasTransientWriteParent = false;
- for ( int pid=0; pid < node.getOutputs().size(); pid++ ) {
- N parent = (N)node.getOutputs().get(pid);
- if ( parent.getExecLocation() == ExecLocation.Data
- && ((Data)parent).getOperationType() == Data.OperationTypes.WRITE
- && ((Data)parent).isTransient() ) {
- hasTransientWriteParent = true;
- break;
- }
- }
-
- if ( !hasTransientWriteParent ) {
- deleteInst.addAll(out.getLastInstructions());
- }
- else {
- var_deletions.add(node.getOutputParameters().getLabel());
- var_deletionsLineNum.put(node.getOutputParameters().getLabel(), node);
-
- //System.out.println(" --> skipping " + out.getLastInstructions() + " while processing node " + node.getID());
- }
- }
-
- String inst_string = "";
-
- // Lops with arbitrary number of inputs (ParameterizedBuiltin, GroupedAggregate, DataGen)
- // are handled separately, by simply passing ONLY the output variable to getInstructions()
- if (node.getType() == Lop.Type.ParameterizedBuiltin
- || node.getType() == Lop.Type.GroupedAgg
- || node.getType() == Lop.Type.DataGen ){
- inst_string = node.getInstructions(node.getOutputParameters().getLabel());
- }
-
- // Lops with arbitrary number of inputs and outputs are handled
- // separately as well by passing arrays of inputs and outputs
- else if ( node.getType() == Lop.Type.FunctionCallCP )
- {
- String[] inputs = new String[node.getInputs().size()];
- String[] outputs = new String[node.getOutputs().size()];
- int count = 0;
- for( Lop in : node.getInputs() )
- inputs[count++] = in.getOutputParameters().getLabel();
- count = 0;
- for( Lop out : node.getOutputs() )
- {
- outputs[count++] = out.getOutputParameters().getLabel();
- }
-
- inst_string = node.getInstructions(inputs, outputs);
- }
- else {
- if ( node.getInputs().isEmpty() ) {
- // currently, such a case exists only for Rand lop
- inst_string = node.getInstructions(node.getOutputParameters().getLabel());
- }
- else if (node.getInputs().size() == 1) {
- inst_string = node.getInstructions(node.getInputs()
- .get(0).getOutputParameters().getLabel(),
- node.getOutputParameters().getLabel());
- }
- else if (node.getInputs().size() == 2) {
- inst_string = node.getInstructions(
- node.getInputs().get(0).getOutputParameters().getLabel(),
- node.getInputs().get(1).getOutputParameters().getLabel(),
- node.getOutputParameters().getLabel());
- }
- else if (node.getInputs().size() == 3 || node.getType() == Type.Ternary) {
- inst_string = node.getInstructions(
- node.getInputs().get(0).getOutputParameters().getLabel(),
- node.getInputs().get(1).getOutputParameters().getLabel(),
- node.getInputs().get(2).getOutputParameters().getLabel(),
- node.getOutputParameters().getLabel());
- }
- else if (node.getInputs().size() == 4) {
- inst_string = node.getInstructions(
- node.getInputs().get(0).getOutputParameters().getLabel(),
- node.getInputs().get(1).getOutputParameters().getLabel(),
- node.getInputs().get(2).getOutputParameters().getLabel(),
- node.getInputs().get(3).getOutputParameters().getLabel(),
- node.getOutputParameters().getLabel());
- }
- else if (node.getInputs().size() == 5) {
- inst_string = node.getInstructions(
- node.getInputs().get(0).getOutputParameters().getLabel(),
- node.getInputs().get(1).getOutputParameters().getLabel(),
- node.getInputs().get(2).getOutputParameters().getLabel(),
- node.getInputs().get(3).getOutputParameters().getLabel(),
- node.getInputs().get(4).getOutputParameters().getLabel(),
- node.getOutputParameters().getLabel());
- }
- else if (node.getInputs().size() == 6) {
- inst_string = node.getInstructions(
- node.getInputs().get(0).getOutputParameters().getLabel(),
- node.getInputs().get(1).getOutputParameters().getLabel(),
- node.getInputs().get(2).getOutputParameters().getLabel(),
- node.getInputs().get(3).getOutputParameters().getLabel(),
- node.getInputs().get(4).getOutputParameters().getLabel(),
- node.getInputs().get(5).getOutputParameters().getLabel(),
- node.getOutputParameters().getLabel());
- }
- else if (node.getInputs().size() == 7) {
- inst_string = node.getInstructions(
- node.getInputs().get(0).getOutputParameters().getLabel(),
- node.getInputs().get(1).getOutputParameters().getLabel(),
- node.getInputs().get(2).getOutputParameters().getLabel(),
- node.getInputs().get(3).getOutputParameters().getLabel(),
- node.getInputs().get(4).getOutputParameters().getLabel(),
- node.getInputs().get(5).getOutputParameters().getLabel(),
- node.getInputs().get(6).getOutputParameters().getLabel(),
- node.getOutputParameters().getLabel());
- }
-
- else {
- throw new LopsException(node.printErrorLocation() + "Node with " + node.getInputs().size() + " inputs is not supported in CP yet! \n");
- }
- }
-
- try {
- if( LOG.isTraceEnabled() )
- LOG.trace("Generating instruction - "+ inst_string);
- Instruction currInstr = InstructionParser.parseSingleInstruction(inst_string);
- if (node._beginLine != 0)
- currInstr.setLocation(node);
- else if ( !node.getOutputs().isEmpty() )
- currInstr.setLocation(node.getOutputs().get(0));
- else if ( !node.getInputs().isEmpty() )
- currInstr.setLocation(node.getInputs().get(0));
-
- inst.add(currInstr);
- } catch (Exception e) {
- throw new LopsException(node.printErrorLocation() + "Problem generating simple inst - "
- + inst_string, e);
- }
-
- markedNodes.add(node);
- doRmVar = true;
- //continue;
- }
- else if (node.getExecLocation() == ExecLocation.Data ) {
- Data dnode = (Data)node;
- Data.OperationTypes op = dnode.getOperationType();
-
- if ( op == Data.OperationTypes.WRITE ) {
- NodeOutput out = null;
- if ( sendWriteLopToMR(node) ) {
- // In this case, Data WRITE lop goes into MR, and
- // we don't have to do anything here
- doRmVar = false;
- }
- else {
- out = setupNodeOutputs(node, ExecType.CP, false, false);
- if ( dnode.getDataType() == DataType.SCALAR ) {
- // processing is same for both transient and persistent scalar writes
- writeInst.addAll(out.getLastInstructions());
- //inst.addAll(out.getLastInstructions());
- doRmVar = false;
- }
- else {
- // setupNodeOutputs() handles both transient and persistent matrix writes
- if ( dnode.isTransient() ) {
- //inst.addAll(out.getPreInstructions()); // dummy ?
- deleteInst.addAll(out.getLastInstructions());
- doRmVar = false;
- }
- else {
- // In case of persistent write lop, write instruction will be generated
- // and that instruction must be added to <code>inst</code> so that it gets
- // executed immediately. If it is added to <code>deleteInst</code> then it
- // gets executed at the end of program block's execution
- inst.addAll(out.getLastInstructions());
- doRmVar = true;
- }
- }
- markedNodes.add(node);
- //continue;
- }
- }
- else {
- // generate a temp label to hold the value that is read from HDFS
- if ( node.getDataType() == DataType.SCALAR ) {
- node.getOutputParameters().setLabel(Lop.SCALAR_VAR_NAME_PREFIX + var_index.getNextID());
- String io_inst = node.getInstructions(node.getOutputParameters().getLabel(),
- node.getOutputParameters().getFile_name());
- CPInstruction currInstr = CPInstructionParser.parseSingleInstruction(io_inst);
- currInstr.setLocation(node);
-
- inst.add(currInstr);
-
- Instruction tempInstr = VariableCPInstruction.prepareRemoveInstruction(node.getOutputParameters().getLabel());
- tempInstr.setLocation(node);
- deleteInst.add(tempInstr);
- }
- else {
- throw new LopsException("Matrix READs are not handled in CP yet!");
- }
- markedNodes.add(node);
- doRmVar = true;
- //continue;
- }
- }
-
- // see if rmvar instructions can be generated for node's inputs
- if(doRmVar)
- processConsumersForInputs(node, inst, deleteInst);
- doRmVar = false;
- }
-
- for ( String var : var_deletions ) {
- Instruction rmInst = VariableCPInstruction.prepareRemoveInstruction(var);
- if( LOG.isTraceEnabled() )
- LOG.trace(" Adding var_deletions: " + rmInst.toString());
-
- rmInst.setLocation(var_deletionsLineNum.get(var));
-
- deleteInst.add(rmInst);
- }
-
- // delete all marked nodes
- for (int i = 0; i < markedNodes.size(); i++) {
- execNodes.remove(markedNodes.get(i));
- }
-
- }
-
- /**
- * Method to remove all child nodes of a queued node that should be executed
- * in a following iteration.
- *
- * @param node
- * @param finishedNodes
- * @param execNodes
- * @param queuedNodes
- * @throws LopsException
- */
-
- private void removeNodesForNextIteration(N node, ArrayList<N> finishedNodes,
- ArrayList<N> execNodes, ArrayList<N> queuedNodes,
- ArrayList<ArrayList<N>> jobvec) throws LopsException {
-
- // only queued nodes with multiple inputs need to be handled.
- if (node.getInputs().size() == 1)
- return;
-
- //if all children are queued, then there is nothing to do.
- int numInputs = node.getInputs().size();
- boolean allQueued = true;
- for(int i=0; i < numInputs; i++) {
- if( !queuedNodes.contains(node.getInputs().get(i)) ) {
- allQueued = false;
- break;
- }
- }
- if ( allQueued )
- return;
-
- if( LOG.isTraceEnabled() )
- LOG.trace(" Before remove nodes for next iteration -- size of execNodes " + execNodes.size());
-
- // Determine if <code>node</code> has inputs from the same job or multiple jobs
- int jobid = Integer.MIN_VALUE;
- boolean inputs_in_same_job = true;
- for(int idx=0; idx < node.getInputs().size(); idx++) {
- int input_jobid = jobType(node.getInputs().get(idx), jobvec);
- if ( jobid == Integer.MIN_VALUE )
- jobid = input_jobid;
- else if ( jobid != input_jobid ) {
- inputs_in_same_job = false;
- break;
- }
- }
-
- // Determine if there exist any unassigned inputs to <code>node</code>
- // Evaluate only those lops that execute in MR.
- boolean unassigned_inputs = false;
- for(int i=0; i < numInputs; i++) {
- Lop input = node.getInputs().get(i);
- //if ( input.getExecLocation() != ExecLocation.ControlProgram && jobType(input, jobvec) == -1 ) {
- if ( input.getExecType() == ExecType.MR && !execNodes.contains(input)) { //jobType(input, jobvec) == -1 ) {
- unassigned_inputs = true;
- break;
- }
- }
-
- // Determine if any node's children are queued
- boolean child_queued = false;
- for(int i=0; i < numInputs; i++) {
- if (queuedNodes.contains(node.getInputs().get(i)) ) {
- child_queued = true;
- break;
- }
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace(" Property Flags:");
- LOG.trace(" Inputs in same job: " + inputs_in_same_job);
- LOG.trace(" Unassigned inputs: " + unassigned_inputs);
- LOG.trace(" Child queued: " + child_queued);
- }
-
- // Evaluate each lop in <code>execNodes</code> for removal.
- // Add lops to be removed to <code>markedNodes</code>.
-
- ArrayList<N> markedNodes = new ArrayList<N>();
- for (int i = 0; i < execNodes.size(); i++) {
-
- N tmpNode = execNodes.get(i);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace(" Checking for removal (" + tmpNode.getID() + ") " + tmpNode.toString());
- }
-
- // if tmpNode is not a descendant of 'node', then there is no advantage in removing tmpNode for later iterations.
- if(!isChild(tmpNode, node, IDMap))
- continue;
-
- // handle group input lops
- if(node.getInputs().contains(tmpNode) && tmpNode.isAligner()) {
- markedNodes.add(tmpNode);
- if( LOG.isTraceEnabled() )
- LOG.trace(" Removing for next iteration (code 1): (" + tmpNode.getID() + ") " + tmpNode.toString());
- }
-
- //if (child_queued) {
- // if one of the children are queued,
- // remove some child nodes on other leg that may be needed later on.
- // For e.g. Group lop.
-
- if (!hasOtherQueuedParentNode(tmpNode, queuedNodes, node)
- && branchHasNoOtherUnExecutedParents(tmpNode, node, execNodes, finishedNodes)) {
-
- boolean queueit = false;
- int code = -1;
- switch(node.getExecLocation()) {
- case Map:
- if(branchCanBePiggyBackedMap(tmpNode, node, execNodes, queuedNodes, markedNodes))
- queueit = true;
- code=2;
- break;
-
- case MapAndReduce:
- if(branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, queuedNodes)&& !tmpNode.definesMRJob())
- queueit = true;
- code=3;
- break;
- case Reduce:
- if(branchCanBePiggyBackedReduce(tmpNode, node, execNodes, queuedNodes))
- queueit = true;
- code=4;
- break;
- default:
- //do nothing
- }
-
- if(queueit) {
- if( LOG.isTraceEnabled() )
- LOG.trace(" Removing for next iteration (code " + code + "): (" + tmpNode.getID() + ") " + tmpNode.toString());
-
- markedNodes.add(tmpNode);
- }
- }
- /*
- * "node" has no other queued children.
- *
- * If inputs are in the same job and "node" is of type
- * MapAndReduce, then remove nodes of all types other than
- * Reduce, MapAndReduce, and the ones that define a MR job as
- * they can be piggybacked later.
- *
- * e.g: A=Rand, B=Rand, C=A%*%B Here, both inputs of MMCJ lop
- * come from Rand job, and they should not be removed.
- *
- * Other examples: -- MMCJ whose children are of type
- * MapAndReduce (say GMR) -- Inputs coming from two different
- * jobs .. GMR & REBLOCK
- */
- //boolean himr = hasOtherMapAndReduceParentNode(tmpNode, execNodes,node);
- //boolean bcbp = branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes);
- //System.out.println(" .. " + inputs_in_same_job + "," + himr + "," + bcbp);
- if ((inputs_in_same_job || unassigned_inputs)
- && node.getExecLocation() == ExecLocation.MapAndReduce
- && !hasOtherMapAndReduceParentNode(tmpNode, execNodes,node) // don't remove since it already piggybacked with a MapReduce node
- && branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, queuedNodes)
- && !tmpNode.definesMRJob()) {
- if( LOG.isTraceEnabled() )
- LOG.trace(" Removing for next iteration (code 5): ("+ tmpNode.getID() + ") " + tmpNode.toString());
-
- markedNodes.add(tmpNode);
- }
- } // for i
-
- // we also need to delete all parent nodes of marked nodes
- for (int i = 0; i < execNodes.size(); i++) {
- LOG.trace(" Checking for removal - ("
- + execNodes.get(i).getID() + ") "
- + execNodes.get(i).toString());
-
- if (hasChildNode(execNodes.get(i), markedNodes) && !markedNodes.contains(execNodes.get(i))) {
- markedNodes.add(execNodes.get(i));
- LOG.trace(" Removing for next iteration (code 6) (" + execNodes.get(i).getID() + ") " + execNodes.get(i).toString());
- }
- }
-
- if ( execNodes.size() != markedNodes.size() ) {
- // delete marked nodes from finishedNodes and execNodes
- // add to queued nodes
- for(N n : markedNodes) {
- if ( n.usesDistributedCache() )
- gmrMapperFootprint -= computeFootprintInMapper(n);
- finishedNodes.remove(n);
- execNodes.remove(n);
- removeNodeByJobType(n, jobvec);
- queuedNodes.add(n);
- }
- }
- /*for (int i = 0; i < markedNodes.size(); i++) {
- finishedNodes.remove(markedNodes.elementAt(i));
- execNodes.remove(markedNodes.elementAt(i));
- removeNodeByJobType(markedNodes.elementAt(i), jobvec);
- queuedNodes.add(markedNodes.elementAt(i));
- }*/
- }
-
- @SuppressWarnings("unused")
- private void removeNodesForNextIterationOLD(N node, ArrayList<N> finishedNodes,
- ArrayList<N> execNodes, ArrayList<N> queuedNodes,
- ArrayList<ArrayList<N>> jobvec) throws LopsException {
- // only queued nodes with two inputs need to be handled.
-
- // TODO: statiko -- this should be made == 1
- if (node.getInputs().size() != 2)
- return;
-
-
- //if both children are queued, nothing to do.
- if (queuedNodes.contains(node.getInputs().get(0))
- && queuedNodes.contains(node.getInputs().get(1)))
- return;
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Before remove nodes for next iteration -- size of execNodes " + execNodes.size());
-
-
-
- boolean inputs_in_same_job = false;
- // TODO: handle tertiary
- if (jobType(node.getInputs().get(0), jobvec) != -1
- && jobType(node.getInputs().get(0), jobvec) == jobType(node
- .getInputs().get(1), jobvec)) {
- inputs_in_same_job = true;
- }
-
- boolean unassigned_inputs = false;
- // here.. scalars shd be ignored
- if ((node.getInputs().get(0).getExecLocation() != ExecLocation.ControlProgram && jobType(
- node.getInputs().get(0), jobvec) == -1)
- || (node.getInputs().get(1).getExecLocation() != ExecLocation.ControlProgram && jobType(
- node.getInputs().get(1), jobvec) == -1)) {
- unassigned_inputs = true;
- }
-
- boolean child_queued = false;
-
- // check if atleast one child was queued.
- if (queuedNodes.contains(node.getInputs().get(0))
- || queuedNodes.contains(node.getInputs().get(1)))
- child_queued = true;
-
- // nodes to be dropped
- ArrayList<N> markedNodes = new ArrayList<N>();
-
- for (int i = 0; i < execNodes.size(); i++) {
-
- N tmpNode = execNodes.get(i);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Checking for removal (" + tmpNode.getID()
- + ") " + tmpNode.toString());
- LOG.trace("Inputs in same job " + inputs_in_same_job);
- LOG.trace("Unassigned inputs " + unassigned_inputs);
- LOG.trace("Child queued " + child_queued);
-
- }
-
- //if(node.definesMRJob() && isChild(tmpNode,node) && (tmpNode.getCompatibleJobs() & node.getCompatibleJobs()) == 0)
- // continue;
-
- // TODO: statiko -- check if this is too conservative?
- if (child_queued) {
- // if one of the children are queued,
- // remove some child nodes on other leg that may be needed later on.
- // For e.g. Group lop.
-
- if((tmpNode == node.getInputs().get(0) || tmpNode == node.getInputs().get(1)) &&
- tmpNode.isAligner())
- {
- markedNodes.add(tmpNode);
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration: ("
- + tmpNode.getID() + ") " + tmpNode.toString());
- }
- else {
- if (!hasOtherQueuedParentNode(tmpNode, queuedNodes, node)
- && isChild(tmpNode, node, IDMap) && branchHasNoOtherUnExecutedParents(tmpNode, node, execNodes, finishedNodes)) {
-
-
- if(
- //e.g. MMCJ
- (node.getExecLocation() == ExecLocation.MapAndReduce &&
- branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes) && !tmpNode.definesMRJob() )
- ||
- //e.g. Binary
- (node.getExecLocation() == ExecLocation.Reduce && branchCanBePiggyBackedReduce(tmpNode, node, execNodes, finishedNodes)) )
- {
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration: ("
- + tmpNode.getID() + ") " + tmpNode.toString());
-
-
- markedNodes.add(tmpNode);
- }
- }
- }
- } else {
- /*
- * "node" has no other queued children.
- *
- * If inputs are in the same job and "node" is of type
- * MapAndReduce, then remove nodes of all types other than
- * Reduce, MapAndReduce, and the ones that define a MR job as
- * they can be piggybacked later.
- *
- * e.g: A=Rand, B=Rand, C=A%*%B Here, both inputs of MMCJ lop
- * come from Rand job, and they should not be removed.
- *
- * Other examples: -- MMCJ whose children are of type
- * MapAndReduce (say GMR) -- Inputs coming from two different
- * jobs .. GMR & REBLOCK
- */
- if ((inputs_in_same_job || unassigned_inputs)
- && node.getExecLocation() == ExecLocation.MapAndReduce
- && !hasOtherMapAndReduceParentNode(tmpNode, execNodes,
- node)
- && isChild(tmpNode, node, IDMap) &&
- branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes)
- && !tmpNode.definesMRJob()) {
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration:: ("
- + tmpNode.getID() + ") " + tmpNode.toString());
-
- markedNodes.add(tmpNode);
- }
-
- // as this node has inputs coming from different jobs, need to
- // free up everything
- // below and include the closest MapAndReduce lop if this is of
- // type Reduce.
- // if it is of type MapAndReduce, don't need to free any nodes
-
- if (!inputs_in_same_job && !unassigned_inputs
- && isChild(tmpNode, node, IDMap) &&
- (tmpNode == node.getInputs().get(0) || tmpNode == node.getInputs().get(1)) &&
- tmpNode.isAligner())
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration ("
- + tmpNode.getID()
- + ") "
- + tmpNode.toString());
-
- markedNodes.add(tmpNode);
-
- }
-
- }
- }
-
- // we also need to delete all parent nodes of marked nodes
- for (int i = 0; i < execNodes.size(); i++) {
- if( LOG.isTraceEnabled() )
- LOG.trace("Checking for removal - ("
- + execNodes.get(i).getID() + ") "
- + execNodes.get(i).toString());
-
- if (hasChildNode(execNodes.get(i), markedNodes)) {
- markedNodes.add(execNodes.get(i));
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration - ("
- + execNodes.get(i).getID() + ") "
- + execNodes.get(i).toString());
- }
- }
-
- // delete marked nodes from finishedNodes and execNodes
- // add to queued nodes
- for (int i = 0; i < markedNodes.size(); i++) {
- finishedNodes.remove(markedNodes.get(i));
- execNodes.remove(markedNodes.get(i));
- removeNodeByJobType(markedNodes.get(i), jobvec);
- queuedNodes.add(markedNodes.get(i));
- }
- }
-
- private boolean branchCanBePiggyBackedReduce(N tmpNode, N node, ArrayList<N> execNodes, ArrayList<N> queuedNodes) {
- if(node.getExecLocation() != ExecLocation.Reduce)
- return false;
-
- // if tmpNode is descendant of any queued child of node, then branch can not be piggybacked
- for(Lop ni : node.getInputs()) {
- if(queuedNodes.contains(ni) && isChild(tmpNode, ni, IDMap))
- return false;
- }
-
- for(int i=0; i < execNodes.size(); i++) {
- N n = execNodes.get(i);
-
- if(n.equals(node))
- continue;
-
- if(n.equals(tmpNode) && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
- return false;
-
- // check if n is on the branch tmpNode->*->node
- if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap)) {
- if(!node.getInputs().contains(tmpNode) // redundant
- && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
- return false;
- }
- /*if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap)
- && !node.getInputs().contains(tmpNode)
- && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
- return false;*/
- }
- return true;
- }
-
- @SuppressWarnings("unchecked")
- private boolean branchCanBePiggyBackedMap(N tmpNode, N node, ArrayList<N> execNodes, ArrayList<N> queuedNodes, ArrayList<N> markedNodes) {
- if(node.getExecLocation() != ExecLocation.Map)
- return false;
-
- // if tmpNode is descendant of any queued child of node, then branch can not be piggybacked
- for(Lop ni : node.getInputs()) {
- if(queuedNodes != null && queuedNodes.contains(ni) && isChild(tmpNode, ni, IDMap))
- return false;
- }
-
- // since node.location=Map: only Map & MapOrReduce lops must be considered
- if( tmpNode.definesMRJob() || (tmpNode.getExecLocation() != ExecLocation.Map && tmpNode.getExecLocation() != ExecLocation.MapOrReduce))
- return false;
-
- // if there exist a node "dcInput" that is
- // -- a) parent of tmpNode, and b) feeds into "node" via distributed cache
- // then, tmpNode should not be removed.
- // "dcInput" must be executed prior to "node", and removal of tmpNode does not make that happen.
- if(node.usesDistributedCache() ) {
- for(int dcInputIndex : node.distributedCacheInputIndex()) {
- N dcInput = (N) node.getInputs().get(dcInputIndex-1);
- if(isChild(tmpNode, dcInput, IDMap))
- return false;
- }
- }
-
- // if tmpNode requires an input from distributed cache,
- // remove tmpNode only if that input can fit into mappers' memory. If not,
- if ( tmpNode.usesDistributedCache() ) {
- double memsize = computeFootprintInMapper(tmpNode);
- if (node.usesDistributedCache() )
- memsize += computeFootprintInMapper(node);
- if ( markedNodes != null ) {
- for(N n : markedNodes) {
- if ( n.usesDistributedCache() )
- memsize += computeFootprintInMapper(n);
- }
- }
- if ( !checkMemoryLimits(node, memsize ) ) {
- return false;
- }
- }
-
- if( (tmpNode.getCompatibleJobs() & node.getCompatibleJobs()) > 0)
- return true;
- else
- return false;
-
- }
-
- /**
- * Function that checks if <code>tmpNode</code> can be piggybacked with MapAndReduce
- * lop <code>node</code>.
- *
- * Decision depends on the exec location of <code>tmpNode</code>. If the exec location is:
- * MapAndReduce: CAN NOT be piggybacked since it defines its own MR job
- * Reduce: CAN NOT be piggybacked since it must execute before <code>node</code>
- * Map or MapOrReduce: CAN be piggybacked ONLY IF it is comatible w/ <code>tmpNode</code>
- *
- * @param tmpNode
- * @param node
- * @param execNodes
- * @param finishedNodes
- * @return
- */
- private boolean branchCanBePiggyBackedMapAndReduce(N tmpNode, N node,
- ArrayList<N> execNodes, ArrayList<N> queuedNodes) {
-
- if (node.getExecLocation() != ExecLocation.MapAndReduce)
- return false;
- JobType jt = JobType.findJobTypeFromLop(node);
-
- for (int i = 0; i < execNodes.size(); i++) {
- N n = execNodes.get(i);
-
- if (n.equals(node))
- continue;
-
- // Evaluate only nodes on the branch between tmpNode->..->node
- if (n.equals(tmpNode) || (isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap))) {
- if ( hasOtherMapAndReduceParentNode(tmpNode, queuedNodes,node) )
- return false;
- ExecLocation el = n.getExecLocation();
- if (el != ExecLocation.Map && el != ExecLocation.MapOrReduce)
- return false;
- else if (!isCompatible(n, jt))
- return false;
- }
-
- /*
- * if(n.equals(tmpNode) && n.getExecLocation() != ExecLocation.Map
- * && n.getExecLocation() != ExecLocation.MapOrReduce) return false;
- *
- * if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap) &&
- * n.getExecLocation() != ExecLocation.Map && n.getExecLocation() !=
- * ExecLocation.MapOrReduce) return false;
- */
-
- }
- return true;
- }
-
- private boolean branchHasNoOtherUnExecutedParents(N tmpNode, N node,
- ArrayList<N> execNodes, ArrayList<N> finishedNodes) {
-
- //if tmpNode has more than one unfinished output, return false
- if(tmpNode.getOutputs().size() > 1)
- {
- int cnt = 0;
- for (int j = 0; j < tmpNode.getOutputs().size(); j++) {
- if (!finishedNodes.contains(tmpNode.getOutputs().get(j)))
- cnt++;
- }
-
- if(cnt != 1)
- return false;
- }
-
- //check to see if any node between node and tmpNode has more than one unfinished output
- for(int i=0; i < execNodes.size(); i++)
- {
- N n = execNodes.get(i);
-
- if(n.equals(node) || n.equals(tmpNode))
- continue;
-
- if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap))
- {
- int cnt = 0;
- for (int j = 0; j < n.getOutputs().size(); j++) {
- if (!finishedNodes.contains(n.getOutputs().get(j)))
- cnt++;
- }
-
- if(cnt != 1)
- return false;
- }
- }
-
- return true;
- }
-
- /**
- * Method to check of there is a node of type MapAndReduce between a child
- * (tmpNode) and its parent (node)
- *
- * @param tmpNode
- * @param execNodes
- * @param node
- * @return
- */
-
- @SuppressWarnings({ "unchecked", "unused" })
- private boolean hasMapAndReduceParentNode(N tmpNode, ArrayList<N> execNodes, N node)
- {
- for (int i = 0; i < tmpNode.getOutputs().size(); i++) {
- N n = (N) tmpNode.getOutputs().get(i);
-
- if (execNodes.contains(n)
- && n.getExecLocation() == ExecLocation.MapAndReduce
- && isChild(n, node, IDMap)) {
- return true;
- } else {
- if (hasMapAndReduceParentNode(n, execNodes, node))
- return true;
- }
-
- }
-
- return false;
- }
-
- /**
- * Method to return the job index for a lop.
- *
- * @param lops
- * @param jobvec
- * @return
- * @throws LopsException
- */
-
- private int jobType(Lop lops, ArrayList<ArrayList<N>> jobvec) throws LopsException {
- for ( JobType jt : JobType.values()) {
- int i = jt.getId();
- if (i > 0 && jobvec.get(i) != null && jobvec.get(i).contains(lops)) {
- return i;
- }
- }
- return -1;
- }
-
- /**
- * Method to see if there is a node of type MapAndReduce between tmpNode and node
- * in given node collection
- *
- * @param tmpNode
- * @param nodeList
- * @param node
- * @return
- */
-
- @SuppressWarnings("unchecked")
- private boolean hasOtherMapAndReduceParentNode(N tmpNode,
- ArrayList<N> nodeList, N node) {
-
- if ( tmpNode.getExecLocation() == ExecLocation.MapAndReduce)
- return true;
-
- for (int i = 0; i < tmpNode.getOutputs().size(); i++) {
- N n = (N) tmpNode.getOutputs().get(i);
-
- if ( nodeList.contains(n) && isChild(n,node,IDMap)) {
- if(!n.equals(node) && n.getExecLocation() == ExecLocation.MapAndReduce)
- return true;
- else
- return hasOtherMapAndReduceParentNode(n, nodeList, node);
- }
- }
-
- return false;
- }
-
- /**
- * Method to check if there is a queued node that is a parent of both tmpNode and node
- *
- * @param tmpNode
- * @param queuedNodes
- * @param node
- * @return
- */
- private boolean hasOtherQueuedParentNode(N tmpNode, ArrayList<N> queuedNodes, N node) {
- if ( queuedNodes.isEmpty() )
- return false;
-
- boolean[] nodeMarked = node.get_reachable();
- boolean[] tmpMarked = tmpNode.get_reachable();
- long nodeid = IDMap.get(node.getID());
- long tmpid = IDMap.get(tmpNode.getID());
-
- for ( int i=0; i < queuedNodes.size(); i++ ) {
- int id = IDMap.get(queuedNodes.get(i).getID());
- if ((id != nodeid && nodeMarked[id]) && (id != tmpid && tmpMarked[id]) )
- //if (nodeMarked[id] && tmpMarked[id])
- return true;
- }
-
- return false;
- }
-
- /**
- * Method to print the lops grouped by job type
- *
- * @param jobNodes
- * @throws DMLRuntimeException
- */
- void printJobNodes(ArrayList<ArrayList<N>> jobNodes)
- throws DMLRuntimeException {
- if (LOG.isTraceEnabled()){
- for ( JobType jt : JobType.values() ) {
- int i = jt.getId();
- if (i > 0 && jobNodes.get(i) != null && !jobNodes.get(i).isEmpty() ) {
- LOG.trace(jt.getName() + " Job Nodes:");
-
- for (int j = 0; j < jobNodes.get(i).size(); j++) {
- LOG.trace(" "
- + jobNodes.get(i).get(j).getID() + ") "
- + jobNodes.get(i).get(j).toString());
- }
- }
- }
-
- }
-
-
- }
-
- /**
- * Method to check if there exists any lops with ExecLocation=RecordReader
- *
- * @param nodes
- * @param loc
- * @return
- */
- boolean hasANode(ArrayList<N> nodes, ExecLocation loc) {
- for (int i = 0; i < nodes.size(); i++) {
- if (nodes.get(i).getExecLocation() == ExecLocation.RecordReader)
- return true;
- }
- return false;
- }
-
- ArrayList<ArrayList<N>> splitGMRNodesByRecordReader(ArrayList<N> gmrnodes) {
-
- // obtain the list of record reader nodes
- ArrayList<N> rrnodes = new ArrayList<N>();
- for (int i = 0; i < gmrnodes.size(); i++) {
- if (gmrnodes.get(i).getExecLocation() == ExecLocation.RecordReader)
- rrnodes.add(gmrnodes.get(i));
- }
-
- /*
- * We allocate one extra vector to hold lops that do not depend on any
- * recordreader lops
- */
- ArrayList<ArrayList<N>> splitGMR = createNodeVectors(rrnodes.size() + 1);
-
- // flags to indicate whether a lop has been added to one of the node
- // vectors
- boolean[] flags = new boolean[gmrnodes.size()];
- for (int i = 0; i < gmrnodes.size(); i++) {
- flags[i] = false;
- }
-
- // first, obtain all ancestors of recordreader lops
- for (int rrid = 0; rrid < rrnodes.size(); rrid++) {
- // prepare node list for i^th record reader lop
-
- // add record reader lop
- splitGMR.get(rrid).add(rrnodes.get(rrid));
-
- for (int j = 0; j < gmrnodes.size(); j++) {
- if (rrnodes.get(rrid).equals(gmrnodes.get(j)))
- flags[j] = true;
- else if (isChild(rrnodes.get(rrid), gmrnodes.get(j), IDMap)) {
- splitGMR.get(rrid).add(gmrnodes.get(j));
- flags[j] = true;
- }
- }
- }
-
- // add all remaining lops to a separate job
- int jobindex = rrnodes.size(); // the last node vector
- for (int i = 0; i < gmrnodes.size(); i++) {
- if (!flags[i]) {
- splitGMR.get(jobindex).add(gmrnodes.get(i));
- flags[i] = true;
- }
- }
-
- return splitGMR;
- }
-
- /**
- * Method to generate hadoop jobs. Exec nodes can contains a mixture of node
- * types requiring different mr jobs. This method breaks the job into
- * sub-types and then invokes the appropriate method to generate
- * instructions.
- *
- * @param execNodes
- * @param inst
- * @param deleteinst
- * @param
<TRUNCATED>